From 2d9838f467013d5397b6daf83afb5ccea92065a4 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sun, 20 Feb 2011 01:13:03 -0800 Subject: udp: update docs for transport, create common header for wait implementation Reimplemented simple udp transport with one impl class. Moved wait for ready/select implementation into common header. Important note on select, timeval should have usecs < 1 second or it may error on some platforms. Fixed in this implementation. --- host/lib/transport/CMakeLists.txt | 2 +- host/lib/transport/udp_common.hpp | 51 +++++ host/lib/transport/udp_simple.cpp | 160 ++++----------- host/lib/transport/udp_zero_copy.cpp | 304 ++++++++++++++++++++++++++++ host/lib/transport/udp_zero_copy_asio.cpp | 319 ------------------------------ 5 files changed, 392 insertions(+), 444 deletions(-) create mode 100644 host/lib/transport/udp_common.hpp create mode 100644 host/lib/transport/udp_zero_copy.cpp delete mode 100644 host/lib/transport/udp_zero_copy_asio.cpp (limited to 'host/lib/transport') diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index a98bcc14e..e58957154 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -80,6 +80,6 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy_asio.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/vrt_packet_handler.hpp ) diff --git a/host/lib/transport/udp_common.hpp b/host/lib/transport/udp_common.hpp new file mode 100644 index 000000000..44067b5dc --- /dev/null +++ b/host/lib/transport/udp_common.hpp @@ -0,0 +1,51 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP + +#include +#include + +namespace uhd{ namespace transport{ + + /*! + * Wait for the socket to become ready for a receive operation. + * \param sock_fd the open socket file descriptor + * \param timeout the timeout duration in seconds + * \return true when the socket is ready for receive + */ + UHD_INLINE bool wait_for_recv(int sock_fd, double timeout){ + //setup timeval for timeout + timeval tv; + //If the tv_usec > 1 second on some platforms, select will + //error EINVAL: An invalid timeout interval was specified. + tv.tv_sec = int(timeout); + tv.tv_usec = int(timeout*1000000)%1000000; + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(sock_fd, &rset); + + //call select with timeout on receive socket + return ::select(sock_fd+1, &rset, NULL, NULL, &tv) > 0; + } + +}} //namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp index 6799ac7b2..094f570ff 100644 --- a/host/lib/transport/udp_simple.cpp +++ b/host/lib/transport/udp_simple.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -15,159 +15,71 @@ // along with this program. If not, see . // +#include "udp_common.hpp" #include -#include -#include #include #include using namespace uhd::transport; +namespace asio = boost::asio; /*********************************************************************** - * Helper Functions + * UDP simple implementation: connected and broadcast **********************************************************************/ -/*! - * Wait for available data or timeout. - * \param socket the asio socket - * \param timeout the timeout in seconds - * \return false for timeout, true for data - */ -static bool wait_available( - boost::asio::ip::udp::socket &socket, double timeout -){ - #if defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) - - //setup timeval for timeout - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = long(timeout*1e6); - - //setup rset for timeout - fd_set rset; - FD_ZERO(&rset); - FD_SET(socket.native(), &rset); - - return ::select(socket.native()+1, &rset, NULL, NULL, &tv) > 0; - - #else /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ - - //FIXME: why does select fail on macintosh? - for (size_t i = 0; i < size_t(timeout*1e3); i++){ - if (socket.available()) return true; - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - } - return false; - - #endif /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ -} - -/*********************************************************************** - * UDP connected implementation class - **********************************************************************/ -class udp_connected_impl : public udp_simple{ +class udp_simple_impl : public udp_simple{ public: - //structors - udp_connected_impl(const std::string &addr, const std::string &port); - ~udp_connected_impl(void); + typedef boost::shared_ptr socket_sptr; - //send/recv - size_t send(const boost::asio::const_buffer &); - size_t recv(const boost::asio::mutable_buffer &, double); + udp_simple_impl( + const std::string &addr, const std::string &port, bool bcast, bool connect + ):_connected(connect){ + //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; -private: - boost::asio::ip::udp::socket *_socket; - boost::asio::io_service _io_service; -}; + //resolve the address + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); + _receiver_endpoint = *resolver.resolve(query); -udp_connected_impl::udp_connected_impl(const std::string &addr, const std::string &port){ - //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; + //create and open the socket + _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); + _socket->open(asio::ip::udp::v4()); - // resolve the address - boost::asio::ip::udp::resolver resolver(_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); - boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + //allow broadcasting + _socket->set_option(asio::socket_base::broadcast(bcast)); - // Create, open, and connect the socket - _socket = new boost::asio::ip::udp::socket(_io_service); - _socket->open(boost::asio::ip::udp::v4()); - _socket->connect(receiver_endpoint); -} - -udp_connected_impl::~udp_connected_impl(void){ - delete _socket; -} - -size_t udp_connected_impl::send(const boost::asio::const_buffer &buff){ - return _socket->send(boost::asio::buffer(buff)); -} + //connect the socket + if (connect) _socket->connect(_receiver_endpoint); -size_t udp_connected_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){ - if (not wait_available(*_socket, timeout)) return 0; - return _socket->receive(boost::asio::buffer(buff)); -} + } -/*********************************************************************** - * UDP broadcast implementation class - **********************************************************************/ -class udp_broadcast_impl : public udp_simple{ -public: - //structors - udp_broadcast_impl(const std::string &addr, const std::string &port); - ~udp_broadcast_impl(void); + size_t send(const asio::const_buffer &buff){ + if (_connected) return _socket->send(asio::buffer(buff)); + return _socket->send_to(asio::buffer(buff), _receiver_endpoint); + } - //send/recv - size_t send(const boost::asio::const_buffer &); - size_t recv(const boost::asio::mutable_buffer &, double); + size_t recv(const asio::mutable_buffer &buff, double timeout){ + if (not wait_for_recv(_socket->native(), timeout)) return 0; + return _socket->receive(asio::buffer(buff)); + } private: - boost::asio::ip::udp::socket *_socket; - boost::asio::ip::udp::endpoint _receiver_endpoint; - boost::asio::io_service _io_service; + bool _connected; + asio::io_service _io_service; + socket_sptr _socket; + asio::ip::udp::endpoint _receiver_endpoint; }; -udp_broadcast_impl::udp_broadcast_impl(const std::string &addr, const std::string &port){ - //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; - - // resolve the address - boost::asio::ip::udp::resolver resolver(_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); - _receiver_endpoint = *resolver.resolve(query); - - // Create and open the socket - _socket = new boost::asio::ip::udp::socket(_io_service); - _socket->open(boost::asio::ip::udp::v4()); - - // Allow broadcasting - boost::asio::socket_base::broadcast option(true); - _socket->set_option(option); - -} - -udp_broadcast_impl::~udp_broadcast_impl(void){ - delete _socket; -} - -size_t udp_broadcast_impl::send(const boost::asio::const_buffer &buff){ - return _socket->send_to(boost::asio::buffer(buff), _receiver_endpoint); -} - -size_t udp_broadcast_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){ - if (not wait_available(*_socket, timeout)) return 0; - boost::asio::ip::udp::endpoint sender_endpoint; - return _socket->receive_from(boost::asio::buffer(buff), sender_endpoint); -} - /*********************************************************************** * UDP public make functions **********************************************************************/ udp_simple::sptr udp_simple::make_connected( const std::string &addr, const std::string &port ){ - return sptr(new udp_connected_impl(addr, port)); + return sptr(new udp_simple_impl(addr, port, false, true /* no bcast, connect */)); } udp_simple::sptr udp_simple::make_broadcast( const std::string &addr, const std::string &port ){ - return sptr(new udp_broadcast_impl(addr, port)); + return sptr(new udp_simple_impl(addr, port, true, false /* bcast, no connect */)); } diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp new file mode 100644 index 000000000..793fc6fba --- /dev/null +++ b/host/lib/transport/udp_zero_copy.cpp @@ -0,0 +1,304 @@ +// +// Copyright 2010-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#include "udp_common.hpp" +#include +#include //mtu +#include +#include +#include +#include +#include +#include + +using namespace uhd; +using namespace uhd::transport; +namespace asio = boost::asio; + +//A reasonable number of frames for send/recv and async/sync +static const size_t DEFAULT_NUM_FRAMES = 32; + +/*********************************************************************** + * Reusable managed receiver buffer: + * - Initialize with memory and a release callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +public: + typedef boost::function release_cb_type; + + udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): + _mem(mem), _len(0), _release_cb(release_cb){/* NOP */} + + void release(void){ + if (_len == 0) return; + this->_release_cb(this); + _len = 0; + } + + sptr get_new(size_t len){ + _len = len; + return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter); + } + + template T cast(void) const{return static_cast(_mem);} + +private: + static void fake_deleter(void *obj){ + static_cast(obj)->release(); + } + + const void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + void *_mem; + size_t _len; + release_cb_type _release_cb; +}; + +/*********************************************************************** + * Reusable managed send buffer: + * - Initialize with memory and a commit callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_msb : public managed_send_buffer{ +public: + typedef boost::function commit_cb_type; + + udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): + _mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */} + + void commit(size_t len){ + if (_len == 0) return; + this->_commit_cb(this, len); + _len = 0; + } + + sptr get_new(size_t len){ + _len = len; + return sptr(this, &udp_zero_copy_asio_msb::fake_deleter); + } + +private: + static void fake_deleter(void *obj){ + static_cast(obj)->commit(0); + } + + void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + void *_mem; + size_t _len; + commit_cb_type _commit_cb; +}; + +/*********************************************************************** + * Zero Copy UDP implementation with ASIO: + * This is the portable zero copy implementation for systems + * where a faster, platform specific solution is not available. + * However, it is not a true zero copy implementation as each + * send and recv requires a copy operation to/from userspace. + **********************************************************************/ +class udp_zero_copy_asio_impl : public udp_zero_copy{ +public: + typedef boost::shared_ptr sptr; + + udp_zero_copy_asio_impl( + const std::string &addr, + const std::string &port, + const device_addr_t &hints + ): + _recv_frame_size(size_t(hints.cast("recv_frame_size", udp_simple::mtu))), + _num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_FRAMES))), + _send_frame_size(size_t(hints.cast("send_frame_size", udp_simple::mtu))), + _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_FRAMES))), + _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), + _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), + _pending_recv_buffs(_num_recv_frames), + _pending_send_buffs(_num_send_frames) + { + //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; + + //resolve the address + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); + asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + + //create, open, and connect the socket + _socket = new asio::ip::udp::socket(_io_service); + _socket->open(asio::ip::udp::v4()); + _socket->connect(receiver_endpoint); + _sock_fd = _socket->native(); + + //allocate re-usable managed receive buffers + for (size_t i = 0; i < get_num_recv_frames(); i++){ + _mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), + boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) + ); + handle_recv(&_mrb_pool.back()); + } + + //allocate re-usable managed send buffers + for (size_t i = 0; i < get_num_send_frames(); i++){ + _msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i), + boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) + ); + handle_send(&_msb_pool.back()); + } + } + + ~udp_zero_copy_asio_impl(void){ + delete _socket; + } + + //get size for internal socket buffer + template size_t get_buff_size(void) const{ + Opt option; + _socket->get_option(option); + return option.value(); + } + + //set size for internal socket buffer + template size_t resize_buff(size_t num_bytes){ + Opt option(num_bytes); + _socket->set_option(option); + return get_buff_size(); + } + + /******************************************************************* + * Receive implementation: + * + * Use select to perform a blocking receive with timeout. + * Return the managed receive buffer with the new length. + * When the caller is finished with the managed buffer, + * the managed receive buffer is released back into the queue. + ******************************************************************/ + managed_recv_buffer::sptr get_recv_buff(double timeout){ + udp_zero_copy_asio_mrb *mrb = NULL; + bool recv_ready = wait_for_recv(_sock_fd, timeout); + if (recv_ready and _pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ + return mrb->get_new(::recv(_sock_fd, mrb->cast(), _recv_frame_size, 0)); + } + return managed_recv_buffer::sptr(); + } + + UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ + _pending_recv_buffs.push_with_haste(mrb); + } + + void release(udp_zero_copy_asio_mrb *mrb){ + handle_recv(mrb); + } + + size_t get_num_recv_frames(void) const {return _num_recv_frames;} + size_t get_recv_frame_size(void) const {return _recv_frame_size;} + + /******************************************************************* + * Send implementation: + * + * Get a managed receive buffer immediately with max length set. + * The caller will fill the buffer and commit it when finished. + * The commit routine will perform a blocking send operation, + * and push the managed send buffer back into the queue. + ******************************************************************/ + managed_send_buffer::sptr get_send_buff(double timeout){ + udp_zero_copy_asio_msb *msb = NULL; + if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ + return msb->get_new(_send_frame_size); + } + return managed_send_buffer::sptr(); + } + + UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ + _pending_send_buffs.push_with_haste(msb); + } + + void commit(udp_zero_copy_asio_msb *msb, size_t len){ + ::send(_sock_fd, msb->cast(), len, 0); + handle_send(msb); + } + + size_t get_num_send_frames(void) const {return _num_send_frames;} + size_t get_send_frame_size(void) const {return _send_frame_size;} + +private: + //memory management -> buffers and fifos + const size_t _recv_frame_size, _num_recv_frames; + const size_t _send_frame_size, _num_send_frames; + buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; + bounded_buffer _pending_recv_buffs; + bounded_buffer _pending_send_buffs; + std::list _msb_pool; + std::list _mrb_pool; + + //asio guts -> socket and service + asio::io_service _io_service; + asio::ip::udp::socket *_socket; + int _sock_fd; +}; + +/*********************************************************************** + * UDP zero copy make function + **********************************************************************/ +template static void resize_buff_helper( + udp_zero_copy_asio_impl::sptr udp_trans, + const size_t target_size, + const std::string &name +){ + std::string help_message; + #if defined(UHD_PLATFORM_LINUX) + help_message = str(boost::format( + "Please run: sudo sysctl -w net.core.%smem_max=%d\n" + ) % ((name == "recv")?"r":"w") % target_size); + #endif /*defined(UHD_PLATFORM_LINUX)*/ + + //resize the buffer if size was provided + if (target_size > 0){ + size_t actual_size = udp_trans->resize_buff(target_size); + if (target_size != actual_size) std::cout << boost::format( + "Target %s sock buff size: %d bytes\n" + "Actual %s sock buff size: %d bytes" + ) % name % target_size % name % actual_size << std::endl; + else std::cout << boost::format( + "Current %s sock buff size: %d bytes" + ) % name % actual_size << std::endl; + if (actual_size < target_size) uhd::warning::post(str(boost::format( + "The %s buffer could not be resized sufficiently.\n" + "See the transport application notes on buffer resizing.\n%s" + ) % name % help_message)); + } +} + +udp_zero_copy::sptr udp_zero_copy::make( + const std::string &addr, + const std::string &port, + const device_addr_t &hints +){ + udp_zero_copy_asio_impl::sptr udp_trans( + new udp_zero_copy_asio_impl(addr, port, hints) + ); + + //extract buffer size hints from the device addr + size_t recv_buff_size = size_t(hints.cast("recv_buff_size", 0.0)); + size_t send_buff_size = size_t(hints.cast("send_buff_size", 0.0)); + + //call the helper to resize send and recv buffers + resize_buff_helper(udp_trans, recv_buff_size, "recv"); + resize_buff_helper (udp_trans, send_buff_size, "send"); + + return udp_trans; +} diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp deleted file mode 100644 index 05352ffce..000000000 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ /dev/null @@ -1,319 +0,0 @@ -// -// Copyright 2010-2011 Ettus Research LLC -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . -// - -#include -#include //mtu -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace uhd; -using namespace uhd::transport; -namespace asio = boost::asio; - -//A reasonable number of frames for send/recv and async/sync -static const size_t DEFAULT_NUM_FRAMES = 32; - -/*********************************************************************** - * Reusable managed receiver buffer: - * - Initialize with memory and a release callback. - * - Call get new with a length in bytes to re-use. - **********************************************************************/ -class udp_zero_copy_asio_mrb : public managed_recv_buffer{ -public: - typedef boost::function release_cb_type; - - udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): - _mem(mem), _len(0), _release_cb(release_cb){/* NOP */} - - void release(void){ - if (_len == 0) return; - this->_release_cb(this); - _len = 0; - } - - sptr get_new(size_t len){ - _len = len; - return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter); - } - - template T cast(void) const{return static_cast(_mem);} - -private: - static void fake_deleter(void *obj){ - static_cast(obj)->release(); - } - - const void *get_buff(void) const{return _mem;} - size_t get_size(void) const{return _len;} - - void *_mem; - size_t _len; - release_cb_type _release_cb; -}; - -/*********************************************************************** - * Reusable managed send buffer: - * - Initialize with memory and a commit callback. - * - Call get new with a length in bytes to re-use. - **********************************************************************/ -class udp_zero_copy_asio_msb : public managed_send_buffer{ -public: - typedef boost::function commit_cb_type; - - udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): - _mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */} - - void commit(size_t len){ - if (_len == 0) return; - this->_commit_cb(this, len); - _len = 0; - } - - sptr get_new(size_t len){ - _len = len; - return sptr(this, &udp_zero_copy_asio_msb::fake_deleter); - } - -private: - static void fake_deleter(void *obj){ - static_cast(obj)->commit(0); - } - - void *get_buff(void) const{return _mem;} - size_t get_size(void) const{return _len;} - - void *_mem; - size_t _len; - commit_cb_type _commit_cb; -}; - -/*********************************************************************** - * Zero Copy UDP implementation with ASIO: - * This is the portable zero copy implementation for systems - * where a faster, platform specific solution is not available. - * However, it is not a true zero copy implementation as each - * send and recv requires a copy operation to/from userspace. - **********************************************************************/ -class udp_zero_copy_asio_impl : public udp_zero_copy{ -public: - typedef boost::shared_ptr sptr; - - udp_zero_copy_asio_impl( - const std::string &addr, - const std::string &port, - const device_addr_t &hints - ): - _recv_frame_size(size_t(hints.cast("recv_frame_size", udp_simple::mtu))), - _num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_FRAMES))), - _send_frame_size(size_t(hints.cast("send_frame_size", udp_simple::mtu))), - _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_FRAMES))), - _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), - _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), - _pending_recv_buffs(_num_recv_frames), - _pending_send_buffs(_num_send_frames) - { - //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; - - //resolve the address - asio::ip::udp::resolver resolver(_io_service); - asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); - asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - - //create, open, and connect the socket - _socket = new asio::ip::udp::socket(_io_service); - _socket->open(asio::ip::udp::v4()); - _socket->connect(receiver_endpoint); - _sock_fd = _socket->native(); - - //allocate re-usable managed receive buffers - for (size_t i = 0; i < get_num_recv_frames(); i++){ - _mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), - boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) - ); - handle_recv(&_mrb_pool.back()); - } - - //allocate re-usable managed send buffers - for (size_t i = 0; i < get_num_send_frames(); i++){ - _msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i), - boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) - ); - handle_send(&_msb_pool.back()); - } - } - - ~udp_zero_copy_asio_impl(void){ - delete _socket; - } - - //get size for internal socket buffer - template size_t get_buff_size(void) const{ - Opt option; - _socket->get_option(option); - return option.value(); - } - - //set size for internal socket buffer - template size_t resize_buff(size_t num_bytes){ - Opt option(num_bytes); - _socket->set_option(option); - return get_buff_size(); - } - - /******************************************************************* - * Receive implementation: - * - * Use select to perform a blocking receive with timeout. - * Return the managed receive buffer with the new length. - * When the caller is finished with the managed buffer, - * the managed receive buffer is released back into the queue. - ******************************************************************/ - UHD_INLINE bool is_recv_ready(double timeout){ - //setup timeval for timeout - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = long(timeout*1e6); - - //setup rset for timeout - fd_set rset; - FD_ZERO(&rset); - FD_SET(_sock_fd, &rset); - - //call select with timeout on receive socket - return ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0; - } - - managed_recv_buffer::sptr get_recv_buff(double timeout){ - udp_zero_copy_asio_mrb *mrb = NULL; - if (is_recv_ready(timeout) and _pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ - return mrb->get_new(::recv(_sock_fd, mrb->cast(), _recv_frame_size, 0)); - } - return managed_recv_buffer::sptr(); - } - - UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ - _pending_recv_buffs.push_with_haste(mrb); - } - - void release(udp_zero_copy_asio_mrb *mrb){ - handle_recv(mrb); - } - - size_t get_num_recv_frames(void) const {return _num_recv_frames;} - size_t get_recv_frame_size(void) const {return _recv_frame_size;} - - /******************************************************************* - * Send implementation: - * - * Get a managed receive buffer immediately with max length set. - * The caller will fill the buffer and commit it when finished. - * The commit routine will perform a blocking send operation, - * and push the managed send buffer back into the queue. - ******************************************************************/ - managed_send_buffer::sptr get_send_buff(double timeout){ - udp_zero_copy_asio_msb *msb = NULL; - if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){ - return msb->get_new(_send_frame_size); - } - return managed_send_buffer::sptr(); - } - - UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ - _pending_send_buffs.push_with_haste(msb); - } - - void commit(udp_zero_copy_asio_msb *msb, size_t len){ - ::send(_sock_fd, msb->cast(), len, 0); - handle_send(msb); - } - - size_t get_num_send_frames(void) const {return _num_send_frames;} - size_t get_send_frame_size(void) const {return _send_frame_size;} - -private: - //memory management -> buffers and fifos - const size_t _recv_frame_size, _num_recv_frames; - const size_t _send_frame_size, _num_send_frames; - buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - bounded_buffer _pending_recv_buffs; - bounded_buffer _pending_send_buffs; - std::list _msb_pool; - std::list _mrb_pool; - - //asio guts -> socket and service - asio::io_service _io_service; - asio::ip::udp::socket *_socket; - int _sock_fd; -}; - -/*********************************************************************** - * UDP zero copy make function - **********************************************************************/ -template static void resize_buff_helper( - udp_zero_copy_asio_impl::sptr udp_trans, - const size_t target_size, - const std::string &name -){ - std::string help_message; - #if defined(UHD_PLATFORM_LINUX) - help_message = str(boost::format( - "Please run: sudo sysctl -w net.core.%smem_max=%d\n" - ) % ((name == "recv")?"r":"w") % target_size); - #endif /*defined(UHD_PLATFORM_LINUX)*/ - - //resize the buffer if size was provided - if (target_size > 0){ - size_t actual_size = udp_trans->resize_buff(target_size); - if (target_size != actual_size) std::cout << boost::format( - "Target %s sock buff size: %d bytes\n" - "Actual %s sock buff size: %d bytes" - ) % name % target_size % name % actual_size << std::endl; - else std::cout << boost::format( - "Current %s sock buff size: %d bytes" - ) % name % actual_size << std::endl; - if (actual_size < target_size) uhd::warning::post(str(boost::format( - "The %s buffer could not be resized sufficiently.\n" - "See the transport application notes on buffer resizing.\n%s" - ) % name % help_message)); - } -} - -udp_zero_copy::sptr udp_zero_copy::make( - const std::string &addr, - const std::string &port, - const device_addr_t &hints -){ - udp_zero_copy_asio_impl::sptr udp_trans( - new udp_zero_copy_asio_impl(addr, port, hints) - ); - - //extract buffer size hints from the device addr - size_t recv_buff_size = size_t(hints.cast("recv_buff_size", 0.0)); - size_t send_buff_size = size_t(hints.cast("send_buff_size", 0.0)); - - //call the helper to resize send and recv buffers - resize_buff_helper(udp_trans, recv_buff_size, "recv"); - resize_buff_helper (udp_trans, send_buff_size, "send"); - - return udp_trans; -} -- cgit v1.2.3 From 4787c77bb089bf50a47207bbad55bd65a44769e4 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sun, 20 Feb 2011 02:41:03 -0800 Subject: udp: try non-blocking recv first for performance --- host/lib/transport/udp_common.hpp | 4 +++- host/lib/transport/udp_simple.cpp | 4 +--- host/lib/transport/udp_zero_copy.cpp | 24 ++++++++++++++---------- 3 files changed, 18 insertions(+), 14 deletions(-) (limited to 'host/lib/transport') diff --git a/host/lib/transport/udp_common.hpp b/host/lib/transport/udp_common.hpp index 44067b5dc..47775d9c4 100644 --- a/host/lib/transport/udp_common.hpp +++ b/host/lib/transport/udp_common.hpp @@ -23,13 +23,15 @@ namespace uhd{ namespace transport{ + typedef boost::shared_ptr socket_sptr; + /*! * Wait for the socket to become ready for a receive operation. * \param sock_fd the open socket file descriptor * \param timeout the timeout duration in seconds * \return true when the socket is ready for receive */ - UHD_INLINE bool wait_for_recv(int sock_fd, double timeout){ + UHD_INLINE bool wait_for_recv_ready(int sock_fd, double timeout){ //setup timeval for timeout timeval tv; //If the tv_usec > 1 second on some platforms, select will diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp index 094f570ff..1ee036d52 100644 --- a/host/lib/transport/udp_simple.cpp +++ b/host/lib/transport/udp_simple.cpp @@ -28,8 +28,6 @@ namespace asio = boost::asio; **********************************************************************/ class udp_simple_impl : public udp_simple{ public: - typedef boost::shared_ptr socket_sptr; - udp_simple_impl( const std::string &addr, const std::string &port, bool bcast, bool connect ):_connected(connect){ @@ -58,7 +56,7 @@ public: } size_t recv(const asio::mutable_buffer &buff, double timeout){ - if (not wait_for_recv(_socket->native(), timeout)) return 0; + if (not wait_for_recv_ready(_socket->native(), timeout)) return 0; return _socket->receive(asio::buffer(buff)); } diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index 793fc6fba..dda3bb547 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -139,7 +139,7 @@ public: asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); //create, open, and connect the socket - _socket = new asio::ip::udp::socket(_io_service); + _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); _sock_fd = _socket->native(); @@ -161,10 +161,6 @@ public: } } - ~udp_zero_copy_asio_impl(void){ - delete _socket; - } - //get size for internal socket buffer template size_t get_buff_size(void) const{ Opt option; @@ -182,16 +178,24 @@ public: /******************************************************************* * Receive implementation: * - * Use select to perform a blocking receive with timeout. + * Perform a non-blocking receive for performance, + * and then fall back to a blocking receive with timeout. * Return the managed receive buffer with the new length. * When the caller is finished with the managed buffer, * the managed receive buffer is released back into the queue. ******************************************************************/ managed_recv_buffer::sptr get_recv_buff(double timeout){ udp_zero_copy_asio_mrb *mrb = NULL; - bool recv_ready = wait_for_recv(_sock_fd, timeout); - if (recv_ready and _pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ - return mrb->get_new(::recv(_sock_fd, mrb->cast(), _recv_frame_size, 0)); + if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ + + #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported + ssize_t ret = ::recv(_sock_fd, mrb->cast(), _recv_frame_size, MSG_DONTWAIT); + if (ret > 0) return mrb->get_new(ret); + #endif + + if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new( + ::recv(_sock_fd, mrb->cast(), _recv_frame_size, 0) + ); } return managed_recv_buffer::sptr(); } @@ -247,7 +251,7 @@ private: //asio guts -> socket and service asio::io_service _io_service; - asio::ip::udp::socket *_socket; + socket_sptr _socket; int _sock_fd; }; -- cgit v1.2.3 From a8bb5ec900d8f2d3d2f274a921d19b564c668323 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 21 Feb 2011 19:03:13 -0800 Subject: uhd: replace header checks in cmake files with more robust compile checks for features implemented different ifdefs in the cpp files --- host/include/uhd/transport/if_addrs.hpp | 1 - host/lib/transport/CMakeLists.txt | 25 +++++++++++++------ host/lib/transport/if_addrs.cpp | 16 ++++++------ host/lib/utils/CMakeLists.txt | 43 ++++++++++++++++++++++++--------- host/lib/utils/load_modules.cpp | 14 +++++------ host/lib/utils/thread_priority.cpp | 10 +++++--- 6 files changed, 70 insertions(+), 39 deletions(-) (limited to 'host/lib/transport') diff --git a/host/include/uhd/transport/if_addrs.hpp b/host/include/uhd/transport/if_addrs.hpp index c831750d7..689aff42c 100644 --- a/host/include/uhd/transport/if_addrs.hpp +++ b/host/include/uhd/transport/if_addrs.hpp @@ -31,7 +31,6 @@ namespace uhd{ namespace transport{ std::string inet; std::string mask; std::string bcast; - if_addrs_t(void); }; /*! diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index e58957154..a5bf9c5f1 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -48,20 +48,31 @@ ENDIF(ENABLE_USB) ######################################################################## MESSAGE(STATUS "") MESSAGE(STATUS "Configuring interface address discovery...") - +INCLUDE(CheckCXXSourceCompiles) INCLUDE(CheckIncludeFileCXX) -CHECK_INCLUDE_FILE_CXX(ifaddrs.h HAVE_IFADDRS_H) + +CHECK_CXX_SOURCE_COMPILES(" + #include + int main(){ + struct ifaddrs *ifap; + getifaddrs(&ifap); + return 0; + } + " HAVE_GETIFADDRS +) + CHECK_INCLUDE_FILE_CXX(winsock2.h HAVE_WINSOCK2_H) -IF(HAVE_IFADDRS_H) +IF(HAVE_GETIFADDRS) MESSAGE(STATUS " Interface address discovery supported through getifaddrs.") - SET(IF_ADDRS_DEFS HAVE_IFADDRS_H) + SET(IF_ADDRS_DEFS HAVE_GETIFADDRS) ELSEIF(HAVE_WINSOCK2_H) MESSAGE(STATUS " Interface address discovery supported through SIO_GET_INTERFACE_LIST.") - SET(IF_ADDRS_DEFS HAVE_WINSOCK2_H) -ELSE(HAVE_IFADDRS_H) + SET(IF_ADDRS_DEFS HAVE_SIO_GET_INTERFACE_LIST) +ELSE() MESSAGE(STATUS " Interface address discovery not supported.") -ENDIF(HAVE_IFADDRS_H) + SET(IF_ADDRS_DEFS HAVE_IF_ADDRS_DUMMY) +ENDIF() SET_SOURCE_FILES_PROPERTIES( ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp diff --git a/host/lib/transport/if_addrs.cpp b/host/lib/transport/if_addrs.cpp index 17cf8455b..b7c8ad844 100644 --- a/host/lib/transport/if_addrs.cpp +++ b/host/lib/transport/if_addrs.cpp @@ -20,14 +20,10 @@ #include #include -uhd::transport::if_addrs_t::if_addrs_t(void){ - /* NOP */ -} - /*********************************************************************** * Interface address discovery through ifaddrs api **********************************************************************/ -#if defined(HAVE_IFADDRS_H) +#ifdef HAVE_GETIFADDRS #include static boost::asio::ip::address_v4 sockaddr_to_ip_addr(sockaddr *addr){ @@ -59,10 +55,12 @@ std::vector uhd::transport::get_if_addrs(void){ return if_addrs; } +#endif /* HAVE_GETIFADDRS */ + /*********************************************************************** * Interface address discovery through windows api **********************************************************************/ -#elif defined(HAVE_WINSOCK2_H) +#ifdef HAVE_SIO_GET_INTERFACE_LIST #include std::vector uhd::transport::get_if_addrs(void){ @@ -98,13 +96,15 @@ std::vector uhd::transport::get_if_addrs(void){ return if_addrs; } +#endif /* HAVE_SIO_GET_INTERFACE_LIST */ + /*********************************************************************** * Interface address discovery not included **********************************************************************/ -#else /* HAVE_IFADDRS_H */ +#ifdef HAVE_IF_ADDRS_DUMMY std::vector uhd::transport::get_if_addrs(void){ return std::vector(); } -#endif /* HAVE_IFADDRS_H */ +#endif /* HAVE_IF_ADDRS_DUMMY */ diff --git a/host/lib/utils/CMakeLists.txt b/host/lib/utils/CMakeLists.txt index 743528189..a4d3b2db2 100644 --- a/host/lib/utils/CMakeLists.txt +++ b/host/lib/utils/CMakeLists.txt @@ -24,8 +24,8 @@ ######################################################################## MESSAGE(STATUS "") MESSAGE(STATUS "Configuring priority scheduling...") - INCLUDE(CheckCXXSourceCompiles) + CHECK_CXX_SOURCE_COMPILES(" #include int main(){ @@ -52,9 +52,10 @@ IF(HAVE_PTHREAD_SETSCHEDPARAM) ELSEIF(HAVE_WIN_SETTHREADPRIORITY) MESSAGE(STATUS " Priority scheduling supported through windows SetThreadPriority.") SET(THREAD_PRIO_DEFS HAVE_WIN_SETTHREADPRIORITY) -ELSE(HAVE_PTHREAD_SETSCHEDPARAM) +ELSE() MESSAGE(STATUS " Priority scheduling not supported.") -ENDIF(HAVE_PTHREAD_SETSCHEDPARAM) + SET(THREAD_PRIO_DEFS HAVE_THREAD_PRIO_DUMMY) +ENDIF() SET_SOURCE_FILES_PROPERTIES( ${CMAKE_CURRENT_SOURCE_DIR}/thread_priority.cpp @@ -66,21 +67,39 @@ SET_SOURCE_FILES_PROPERTIES( ######################################################################## MESSAGE(STATUS "") MESSAGE(STATUS "Configuring module loading...") +INCLUDE(CheckCXXSourceCompiles) -INCLUDE(CheckIncludeFileCXX) -CHECK_INCLUDE_FILE_CXX(dlfcn.h HAVE_DLFCN_H) -CHECK_INCLUDE_FILE_CXX(windows.h HAVE_WINDOWS_H) +SET(CMAKE_REQUIRED_LIBRARIES ${CMAKE_DL_LIBS}) +CHECK_CXX_SOURCE_COMPILES(" + #include + int main(){ + dlopen(0, 0); + return 0; + } + " HAVE_DLOPEN +) +UNSET(CMAKE_REQUIRED_LIBRARIES) + +CHECK_CXX_SOURCE_COMPILES(" + #include + int main(){ + LoadLibrary(0); + return 0; + } + " HAVE_LOAD_LIBRARY +) -IF(HAVE_DLFCN_H) +IF(HAVE_DLOPEN) MESSAGE(STATUS " Module loading supported through dlopen.") - SET(LOAD_MODULES_DEFS HAVE_DLFCN_H) + SET(LOAD_MODULES_DEFS HAVE_DLOPEN) LIBUHD_APPEND_LIBS(${CMAKE_DL_LIBS}) -ELSEIF(HAVE_WINDOWS_H) +ELSEIF(HAVE_LOAD_LIBRARY) MESSAGE(STATUS " Module loading supported through LoadLibrary.") - SET(LOAD_MODULES_DEFS HAVE_WINDOWS_H) -ELSE(HAVE_DLFCN_H) + SET(LOAD_MODULES_DEFS HAVE_LOAD_LIBRARY) +ELSE() MESSAGE(STATUS " Module loading not supported.") -ENDIF(HAVE_DLFCN_H) + SET(LOAD_MODULES_DEFS HAVE_LOAD_MODULES_DUMMY) +ENDIF() SET_SOURCE_FILES_PROPERTIES( ${CMAKE_CURRENT_SOURCE_DIR}/load_modules.cpp diff --git a/host/lib/utils/load_modules.cpp b/host/lib/utils/load_modules.cpp index 623d31eb6..fa9b22438 100644 --- a/host/lib/utils/load_modules.cpp +++ b/host/lib/utils/load_modules.cpp @@ -29,9 +29,8 @@ namespace fs = boost::filesystem; /*********************************************************************** * Module Load Function **********************************************************************/ -#if defined(HAVE_DLFCN_H) +#ifdef HAVE_DLOPEN #include - static void load_module(const std::string &file_name){ if (dlopen(file_name.c_str(), RTLD_LAZY) == NULL){ throw std::runtime_error(str( @@ -39,10 +38,11 @@ static void load_module(const std::string &file_name){ )); } } +#endif /* HAVE_DLOPEN */ -#elif defined(HAVE_WINDOWS_H) -#include +#ifdef HAVE_LOAD_LIBRARY +#include static void load_module(const std::string &file_name){ if (LoadLibrary(file_name.c_str()) == NULL){ throw std::runtime_error(str( @@ -50,16 +50,16 @@ static void load_module(const std::string &file_name){ )); } } +#endif /* HAVE_LOAD_LIBRARY */ -#else +#ifdef HAVE_LOAD_MODULES_DUMMY static void load_module(const std::string &file_name){ throw std::runtime_error(str( boost::format("Module loading not supported: Cannot load \"%s\"") % file_name )); } - -#endif +#endif /* HAVE_LOAD_MODULES_DUMMY */ /*********************************************************************** * Load Modules diff --git a/host/lib/utils/thread_priority.cpp b/host/lib/utils/thread_priority.cpp index 40b74f655..18f372ec0 100644 --- a/host/lib/utils/thread_priority.cpp +++ b/host/lib/utils/thread_priority.cpp @@ -44,7 +44,7 @@ static void check_priority_range(float priority){ /*********************************************************************** * Pthread API to set priority **********************************************************************/ -#if defined(HAVE_PTHREAD_SETSCHEDPARAM) +#ifdef HAVE_PTHREAD_SETSCHEDPARAM #include void uhd::set_thread_priority(float priority, bool realtime){ @@ -67,11 +67,12 @@ static void check_priority_range(float priority){ int ret = pthread_setschedparam(pthread_self(), policy, &sp); if (ret != 0) throw std::runtime_error("error in pthread_setschedparam"); } +#endif /* HAVE_PTHREAD_SETSCHEDPARAM */ /*********************************************************************** * Windows API to set priority **********************************************************************/ -#elif defined(HAVE_WIN_SETTHREADPRIORITY) +#ifdef HAVE_WIN_SETTHREADPRIORITY #include void uhd::set_thread_priority(float priority, bool realtime){ @@ -93,13 +94,14 @@ static void check_priority_range(float priority){ if (SetThreadPriority(GetCurrentThread(), priorities[pri_index]) == 0) throw std::runtime_error("error in SetThreadPriority"); } +#endif /* HAVE_WIN_SETTHREADPRIORITY */ /*********************************************************************** * Unimplemented API to set priority **********************************************************************/ -#else +#ifdef HAVE_LOAD_MODULES_DUMMY void uhd::set_thread_priority(float, bool){ throw std::runtime_error("set thread priority not implemented"); } -#endif /* HAVE_PTHREAD_SETSCHEDPARAM */ +#endif /* HAVE_LOAD_MODULES_DUMMY */ -- cgit v1.2.3