diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 6 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 186 |
3 files changed, 133 insertions, 61 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 2be2c89ec..bf92b0822 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -101,7 +101,7 @@ SET_SOURCE_FILES_PROPERTIES( LIBUHD_APPEND_SOURCES( ${CMAKE_SOURCE_DIR}/lib/transport/if_addrs.cpp ${CMAKE_SOURCE_DIR}/lib/transport/udp_simple.cpp - #${CMAKE_SOURCE_DIR}/lib/transport/udp_zero_copy_asio.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/udp_zero_copy_asio.cpp ${CMAKE_SOURCE_DIR}/lib/transport/vrt_packet_handler.hpp ${CMAKE_SOURCE_DIR}/lib/transport/zero_copy.cpp ) diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index e1cc8398c..c819302b6 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -277,7 +277,7 @@ libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ /*********************************************************************** * USB zero_copy device class **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy { +class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> { public: typedef boost::shared_ptr<libusb_zero_copy_impl> sptr; @@ -378,7 +378,7 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ else { return managed_recv_buffer::make_safe( boost::asio::const_buffer(lut->buffer, lut->actual_length), - boost::bind(&libusb_zero_copy_impl::release, this, lut) + boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) ); } } @@ -398,7 +398,7 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ else { return managed_send_buffer::make_safe( boost::asio::mutable_buffer(lut->buffer, _send_xfer_size), - boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1) + boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) ); } } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 3130830a5..70e7514a1 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -17,20 +17,23 @@ #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/transport/udp_simple.hpp> //mtu +#include <uhd/transport/bounded_buffer.hpp> #include <uhd/utils/assert.hpp> #include <uhd/utils/warning.hpp> -#include <boost/cstdint.hpp> +#include <boost/shared_array.hpp> #include <boost/asio.hpp> #include <boost/format.hpp> +#include <boost/thread.hpp> #include <iostream> using namespace uhd::transport; +namespace asio = boost::asio; /*********************************************************************** * Constants **********************************************************************/ //enough buffering for half a second of samples at full rate on usrp2 -static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(sizeof(boost::uint32_t) * 25e6 * 0.5); +static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); //Large buffers cause more underflow at high rates. //Perhaps this is due to the kernel scheduling, //but may change with host-based flow control. @@ -43,39 +46,55 @@ static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class udp_zero_copy_impl: - public phony_zero_copy_recv_if, - public phony_zero_copy_send_if, - public udp_zero_copy -{ +class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> { public: - typedef boost::shared_ptr<udp_zero_copy_impl> sptr; - - udp_zero_copy_impl( - const std::string &addr, - const std::string &port - ): - phony_zero_copy_recv_if(udp_simple::mtu), - phony_zero_copy_send_if(udp_simple::mtu) - { + typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; + + udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){ //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; // resolve the address - boost::asio::ip::udp::resolver resolver(_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); - boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); + asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); // create, open, and connect the socket - _socket = new boost::asio::ip::udp::socket(_io_service); - _socket->open(boost::asio::ip::udp::v4()); + _socket = new asio::ip::udp::socket(_io_service); + _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); - _sock_fd = _socket->native(); } - ~udp_zero_copy_impl(void){ + ~udp_zero_copy_asio_impl(void){ + _io_service.stop(); + _thread_group.join_all(); delete _socket; } + /*! + * Init, the second contructor: + * Allocate memory and spwan service thread. + */ + void init(void){ + //allocate all recv frames and release them to begin xfers + _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames()); + for (size_t i = 0; i < this->get_num_recv_frames(); i++){ + boost::shared_array<char> buff(new char[udp_simple::mtu]); + _buffers.push_back(buff); //store a reference to this shared array + release(buff.get()); + } + + //allocate all send frames and push them into the fifo + _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames()); + for (size_t i = 0; i < this->get_num_send_frames(); i++){ + boost::shared_array<char> buff(new char[udp_simple::mtu]); + _buffers.push_back(buff); //store a reference to this shared array + handle_send(buff.get()); + } + + //spawn the service thread that will run the io service + _thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this)); + } + //get size for internal socket buffer template <typename Opt> size_t get_buff_size(void) const{ Opt option; @@ -90,60 +109,111 @@ public: return get_buff_size<Opt>(); } - //The number of frames is approximately the buffer size divided by the max datagram size. //In reality, this is a phony zero-copy interface and the number of frames is infinite. //However, its sensible to advertise a frame count that is approximate to buffer size. //This way, the transport caller will have an idea about how much buffering to create. size_t get_num_recv_frames(void) const{ - return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/udp_simple::mtu; + return this->get_buff_size<asio::socket_base::receive_buffer_size>()/udp_simple::mtu; } size_t get_num_send_frames(void) const{ - return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/udp_simple::mtu; + return this->get_buff_size<asio::socket_base::send_buffer_size>()/udp_simple::mtu; + } + + //! pop a filled recv buffer off of the fifo and bind with the release callback + managed_recv_buffer::sptr get_recv_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ + return managed_recv_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::release, + shared_from_this(), + asio::buffer_cast<void*>(buff) + ) + ); + } + return managed_recv_buffer::sptr(); + } + + //! pop an empty send buffer off of the fifo and bind with the commit callback + managed_send_buffer::sptr get_send_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ + return managed_send_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::commit, + shared_from_this(), + asio::buffer_cast<void*>(buff), _1 + ) + ); + } + return managed_send_buffer::sptr(); } private: - boost::asio::ip::udp::socket *_socket; - boost::asio::io_service _io_service; - int _sock_fd; - - ssize_t recv(const boost::asio::mutable_buffer &buff, double timeout){ - //setup timeval for timeout - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = long(timeout*1e6); - - //setup rset for timeout - fd_set rset; - FD_ZERO(&rset); - FD_SET(_sock_fd, &rset); - - //call select to perform timed wait - if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) return 0; - - return ::recv( - _sock_fd, - boost::asio::buffer_cast<char *>(buff), - boost::asio::buffer_size(buff), 0 + void service(void){ + _io_service.run(); + } + + /******************************************************************* + * The async send and receive callbacks + ******************************************************************/ + + //! handle a recv callback -> push the filled memory into the fifo + void handle_recv(void *mem, size_t len){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); + } + + //! release a recv buffer -> start an async recv on the buffer + void release(void *mem){ + _socket->async_receive( + boost::asio::buffer(mem, udp_simple::mtu), + boost::bind( + &udp_zero_copy_asio_impl::handle_recv, + shared_from_this(), mem, + asio::placeholders::bytes_transferred + ) ); } - ssize_t send(const boost::asio::const_buffer &buff){ - return ::send( - _sock_fd, - boost::asio::buffer_cast<const char *>(buff), - boost::asio::buffer_size(buff), 0 + //! handle a send callback -> push the emptied memory into the fifo + void handle_send(void *mem){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu)); + } + + //! commit a send buffer -> start an async send on the buffer + void commit(void *mem, size_t len){ + _socket->async_send( + boost::asio::buffer(mem, len), + boost::bind( + &udp_zero_copy_asio_impl::handle_send, + shared_from_this(), mem + ) ); } + + //asio guts -> socket and service + asio::ip::udp::socket *_socket; + asio::io_service _io_service; + + //memory management -> buffers and fifos + boost::thread_group _thread_group; + std::vector<boost::shared_array<char> > _buffers; + typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type; + pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; }; /*********************************************************************** * UDP zero copy make function **********************************************************************/ template<typename Opt> static void resize_buff_helper( - udp_zero_copy_impl::sptr udp_trans, + udp_zero_copy_asio_impl::sptr udp_trans, size_t target_size, const std::string &name ){ @@ -183,11 +253,13 @@ udp_zero_copy::sptr udp_zero_copy::make( size_t recv_buff_size, size_t send_buff_size ){ - udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port)); + udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port)); //call the helper to resize send and recv buffers - resize_buff_helper<boost::asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); - resize_buff_helper<boost::asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send"); + resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); + resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send"); + + udp_trans->init(); //buffers resized -> call init() to use return udp_trans; } |