diff options
-rw-r--r-- | host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp | 115 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/udp_common.hpp | 200 | ||||
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/transport/tcp_zero_copy.cpp | 5 | ||||
-rw-r--r-- | host/lib/transport/udp_boost_asio_link.cpp | 126 | ||||
-rw-r--r-- | host/lib/transport/udp_common.hpp | 71 | ||||
-rw-r--r-- | host/lib/transport/udp_simple.cpp | 6 | ||||
-rw-r--r-- | host/lib/transport/udp_wsa_zero_copy.cpp | 2 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy.cpp | 159 |
9 files changed, 491 insertions, 195 deletions
diff --git a/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp b/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp new file mode 100644 index 000000000..2e6f731c9 --- /dev/null +++ b/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp @@ -0,0 +1,115 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP +#define INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/types/device_addr.hpp> +#include <uhdlib/transport/link_base.hpp> +#include <uhdlib/transport/links.hpp> +#include <uhdlib/transport/udp_common.hpp> +#include <boost/asio.hpp> +#include <memory> +#include <vector> + +namespace uhd { namespace transport { + +class udp_boost_asio_frame_buff : public frame_buff +{ +public: + udp_boost_asio_frame_buff(void* mem) + { + _data = mem; + } +}; + +class udp_boost_asio_link : public recv_link_base<udp_boost_asio_link>, + public send_link_base<udp_boost_asio_link> +{ +public: + using sptr = std::shared_ptr<udp_boost_asio_link>; + + /*! + * Make a new udp link. + * + * \param addr a string representing the destination address + * \param port a string representing the destination port + * \param params Values for frame sizes, num frames, and buffer sizes + * \param[out] recv_socket_buff_size Returns the recv socket buffer size + * \param[out] send_socket_buff_size Returns the send socket buffer size + */ + static sptr make(const std::string& addr, + const std::string& port, + const link_params_t& params, + size_t& recv_socket_buff_size, + size_t& send_socket_buff_size); + + /*! Return the local port of the UDP connection. Port is in host byte order. + * + * \returns Port number or 0 if port number couldn't be identified. + */ + uint16_t get_local_port() const; + + /*! Return the local IP address of the UDP connection as a dotted string. + * + * \returns IP address as a string or empty string if the IP address could + * not be identified. + */ + std::string get_local_addr() const; + +private: + using recv_link_base_t = recv_link_base<udp_boost_asio_link>; + using send_link_base_t = send_link_base<udp_boost_asio_link>; + + // Friend declarations to allow base classes to call private methods + friend recv_link_base_t; + friend send_link_base_t; + + udp_boost_asio_link( + const std::string& addr, const std::string& port, const link_params_t& params); + + size_t resize_recv_socket_buffer(size_t num_bytes); + size_t resize_send_socket_buffer(size_t num_bytes); + + // Methods called by recv_link_base + UHD_FORCE_INLINE size_t get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms) + { + return recv_udp_packet(_sock_fd, buff.data(), get_recv_frame_size(), timeout_ms); + } + + UHD_FORCE_INLINE void release_recv_buff_derived(frame_buff& /*buff*/) + { + // No-op + } + + // Methods called by send_link_base + UHD_FORCE_INLINE bool get_send_buff_derived( + frame_buff& /*buff*/, int32_t /*timeout_ms*/) + { + return true; + } + + UHD_FORCE_INLINE void release_send_buff_derived(frame_buff& buff) + { + send_udp_packet(_sock_fd, buff.data(), buff.packet_size()); + } + + buffer_pool::sptr _recv_memory_pool; + buffer_pool::sptr _send_memory_pool; + + std::vector<udp_boost_asio_frame_buff> _recv_buffs; + std::vector<udp_boost_asio_frame_buff> _send_buffs; + + boost::asio::io_service _io_service; + std::shared_ptr<boost::asio::ip::udp::socket> _socket; + int _sock_fd; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP */ diff --git a/host/lib/include/uhdlib/transport/udp_common.hpp b/host/lib/include/uhdlib/transport/udp_common.hpp new file mode 100644 index 000000000..5f5a18c27 --- /dev/null +++ b/host/lib/include/uhdlib/transport/udp_common.hpp @@ -0,0 +1,200 @@ +// +// Copyright 2011 Ettus Research LLC +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_TRANSPORT_UDP_COMMON_HPP +#define INCLUDED_TRANSPORT_UDP_COMMON_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <boost/asio.hpp> +#include <boost/format.hpp> +#include <thread> + +namespace uhd { namespace transport { + +// Jumbo frames can be up to 9600 bytes; +constexpr size_t MAX_ETHERNET_MTU = 9600; + +constexpr size_t UDP_DEFAULT_NUM_FRAMES = 1; + +// Based on common 1500 byte MTU for 1GbE +constexpr size_t UDP_DEFAULT_FRAME_SIZE = 1472; + +// 20ms of data for 1GbE link (in bytes) +constexpr size_t UDP_DEFAULT_BUFF_SIZE = 2500000; + + +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) +// MacOS limits socket buffer size to 1 Mib +static const size_t MAX_BUFF_SIZE_ETH_MACOS = 0x100000; // 1Mib +#endif + +typedef std::shared_ptr<boost::asio::ip::udp::socket> socket_sptr; + +/*! + * Wait for the socket to become ready for a receive operation. + * \param sock_fd the open socket file descriptor + * \param timeout_ms the timeout duration in milliseconds + * \return true when the socket is ready for receive + */ +UHD_INLINE bool wait_for_recv_ready(int sock_fd, int32_t timeout_ms) +{ +#ifdef UHD_PLATFORM_WIN32 // select is more portable than poll unfortunately + // setup timeval for timeout + timeval tv; + // If the tv_usec > 1 second on some platforms, select will + // error EINVAL: An invalid timeout interval was specified. + tv.tv_sec = int(timeout_ms / 1000); + tv.tv_usec = int(timeout_ms * 1000) % 1000000; + + // setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(sock_fd, &rset); + +// http://www.gnu.org/s/hello/manual/libc/Interrupted-Primitives.html +// This macro is provided with gcc to properly deal with EINTR. +// If not provided, define an empty macro, assume that is OK +# ifndef TEMP_FAILURE_RETRY +# define TEMP_FAILURE_RETRY(x) (x) +# endif + + // call select with timeout on receive socket + return TEMP_FAILURE_RETRY(::select(sock_fd + 1, &rset, NULL, NULL, &tv)) > 0; +#else + pollfd pfd_read; + pfd_read.fd = sock_fd; + pfd_read.events = POLLIN; + + // call poll with timeout on receive socket + return ::poll(&pfd_read, 1, (int)timeout_ms) > 0; +#endif +} + +UHD_INLINE socket_sptr open_udp_socket( + const std::string& addr, const std::string& port, boost::asio::io_service& io_service) +{ + using udp = boost::asio::ip::udp; + + // resolve the address + udp::resolver resolver(io_service); + udp::resolver::query query(udp::v4(), addr, port); + udp::endpoint receiver_endpoint = *resolver.resolve(query); + + // create, open, and connect the socket + socket_sptr socket = socket_sptr(new udp::socket(io_service)); + socket->open(udp::v4()); + socket->connect(receiver_endpoint); + + return socket; +} + +UHD_INLINE size_t recv_udp_packet( + int sock_fd, void* mem, size_t frame_size, int32_t timeout_ms) +{ + ssize_t len; + +#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported + len = ::recv(sock_fd, (char*)mem, frame_size, MSG_DONTWAIT); + if (len > 0) { + return len; + } +#endif + + if (wait_for_recv_ready(sock_fd, timeout_ms)) { + len = ::recv(sock_fd, (char*)mem, frame_size, 0); + if (len == 0) { + throw uhd::io_error("socket closed"); + } + if (len < 0) { + throw uhd::io_error( + str(boost::format("recv error on socket: %s") % strerror(errno))); + } + return len; + } + + return 0; // timeout +} + +UHD_INLINE void send_udp_packet(int sock_fd, void* mem, size_t len) +{ + // Retry logic because send may fail with ENOBUFS. + // This is known to occur at least on some OSX systems. + // But it should be safe to always check for the error. + while (true) { + const ssize_t ret = ::send(sock_fd, (const char*)mem, len, 0); + if (ret == ssize_t(len)) + break; + if (ret == -1 and errno == ENOBUFS) { + std::this_thread::sleep_for(std::chrono::microseconds(1)); + continue; // try to send again + } + if (ret == -1) { + throw uhd::io_error( + str(boost::format("send error on socket: %s") % strerror(errno))); + } + UHD_ASSERT_THROW(ret == ssize_t(len)); + } +} + +template <typename Opt> +size_t get_udp_socket_buffer_size(socket_sptr socket) +{ + Opt option; + socket->get_option(option); + return option.value(); +} + +template <typename Opt> +size_t resize_udp_socket_buffer(socket_sptr socket, size_t num_bytes) +{ +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) + // limit buffer resize on macos or it will error + num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS); +#endif + Opt option(num_bytes); + socket->set_option(option); + return get_udp_socket_buffer_size<Opt>(socket); +} + +UHD_INLINE size_t resize_udp_socket_buffer_with_warning( + std::function<size_t(size_t)> resize_fn, + const size_t target_size, + const std::string& name) +{ + size_t actual_size = 0; + std::string help_message; +#if defined(UHD_PLATFORM_LINUX) + help_message = str(boost::format("Please run: sudo sysctl -w net.core.%smem_max=%d") + % ((name == "recv") ? "r" : "w") % target_size); +#endif /*defined(UHD_PLATFORM_LINUX)*/ + + // resize the buffer if size was provided + if (target_size > 0) { + actual_size = resize_fn(target_size); + + UHD_LOGGER_TRACE("UDP") + << boost::format("Target/actual %s sock buff size: %d/%d bytes") % name + % target_size % actual_size; + if (actual_size < target_size) + UHD_LOGGER_WARNING("UDP") + << boost::format( + "The %s buffer could not be resized sufficiently.\n" + "Target sock buff size: %d bytes.\n" + "Actual sock buff size: %d bytes.\n" + "See the transport application notes on buffer resizing.\n%s") + % name % target_size % actual_size % help_message; + } + + return actual_size; +} + + +}} // namespace uhd::transport + +#endif /* INCLUDED_TRANSPORT_UDP_COMMON_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index a9663c89a..003beeee4 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -81,8 +81,10 @@ set_source_files_properties( ######################################################################## if(WIN32) LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp) + LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_boost_asio_link.cpp) else() LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp) + LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_boost_asio_link.cpp) endif() #On windows, the boost asio implementation uses the winsock2 library. diff --git a/host/lib/transport/tcp_zero_copy.cpp b/host/lib/transport/tcp_zero_copy.cpp index 5cb713427..01bca900f 100644 --- a/host/lib/transport/tcp_zero_copy.cpp +++ b/host/lib/transport/tcp_zero_copy.cpp @@ -5,11 +5,11 @@ // SPDX-License-Identifier: GPL-3.0-or-later // -#include "udp_common.hpp" #include <uhd/transport/buffer_pool.hpp> #include <uhd/transport/tcp_zero_copy.hpp> #include <uhd/utils/log.hpp> #include <uhdlib/utils/atomic.hpp> +#include <uhdlib/transport/udp_common.hpp> #include <boost/format.hpp> #include <boost/make_shared.hpp> #include <chrono> @@ -52,8 +52,9 @@ public: return make(this, _mem, size_t(_len)); } #endif + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); - if (wait_for_recv_ready(_sock_fd, timeout)) { + if (wait_for_recv_ready(_sock_fd, timeout_ms)) { _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0); index++; // advances the caller's buffer return make(this, _mem, size_t(_len)); diff --git a/host/lib/transport/udp_boost_asio_link.cpp b/host/lib/transport/udp_boost_asio_link.cpp new file mode 100644 index 000000000..95d68ba91 --- /dev/null +++ b/host/lib/transport/udp_boost_asio_link.cpp @@ -0,0 +1,126 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_boost_asio_link.hpp> +#include <boost/format.hpp> + +using namespace uhd::transport; + +namespace asio = boost::asio; + +udp_boost_asio_link::udp_boost_asio_link( + const std::string& addr, const std::string& port, const link_params_t& params) + : recv_link_base_t(params.num_recv_frames, params.recv_frame_size) + , send_link_base_t(params.num_send_frames, params.send_frame_size) + , _recv_memory_pool(buffer_pool::make(params.num_recv_frames, params.recv_frame_size)) + , _send_memory_pool(buffer_pool::make(params.num_send_frames, params.send_frame_size)) +{ + for (size_t i = 0; i < params.num_recv_frames; i++) { + _recv_buffs.push_back(udp_boost_asio_frame_buff(_recv_memory_pool->at(i))); + } + + for (size_t i = 0; i < params.num_send_frames; i++) { + _send_buffs.push_back(udp_boost_asio_frame_buff(_send_memory_pool->at(i))); + } + + for (auto& buff : _recv_buffs) { + recv_link_base_t::preload_free_buff(&buff); + } + + for (auto& buff : _send_buffs) { + send_link_base_t::preload_free_buff(&buff); + } + + // create, open, and connect the socket + _socket = open_udp_socket(addr, port, _io_service); + _sock_fd = _socket->native_handle(); + + UHD_LOGGER_TRACE("UDP") + << boost::format("Created UDP link to %s:%s") % addr % port; + UHD_LOGGER_TRACE("UDP") << boost::format("Local UDP socket endpoint: %s:%s") + % get_local_addr() % get_local_port(); +} + +uint16_t udp_boost_asio_link::get_local_port() const +{ + return _socket->local_endpoint().port(); +} + +std::string udp_boost_asio_link::get_local_addr() const +{ + return _socket->local_endpoint().address().to_string(); +} + +size_t udp_boost_asio_link::resize_recv_socket_buffer(size_t num_bytes) +{ + return resize_udp_socket_buffer<asio::socket_base::receive_buffer_size>( + _socket, num_bytes); +} + +size_t udp_boost_asio_link::resize_send_socket_buffer(size_t num_bytes) +{ + return resize_udp_socket_buffer<asio::socket_base::send_buffer_size>( + _socket, num_bytes); +} + +udp_boost_asio_link::sptr udp_boost_asio_link::make(const std::string& addr, + const std::string& port, + const link_params_t& params, + size_t& recv_socket_buff_size, + size_t& send_socket_buff_size) +{ + UHD_ASSERT_THROW(params.num_recv_frames != 0); + UHD_ASSERT_THROW(params.num_send_frames != 0); + UHD_ASSERT_THROW(params.recv_frame_size != 0); + UHD_ASSERT_THROW(params.send_frame_size != 0); + UHD_ASSERT_THROW(params.recv_buff_size != 0); + UHD_ASSERT_THROW(params.send_buff_size != 0); + +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) + // limit buffer size on macos to avoid the warning issued by + // resize_buff_helper + if (params.recv_buff_size > MAX_BUFF_SIZE_ETH_MACOS) { + params.recv_buff_size = MAX_BUFF_SIZE_ETH_MACOS; + } + if (params.send_buff_size > MAX_BUFF_SIZE_ETH_MACOS) { + params.send_buff_size = MAX_BUFF_SIZE_ETH_MACOS; + } +#endif + + udp_boost_asio_link::sptr link( + new udp_boost_asio_link(addr, port, params)); + + // call the helper to resize send and recv buffers + + recv_socket_buff_size = resize_udp_socket_buffer_with_warning( + [link](size_t size) { return link->resize_recv_socket_buffer(size); }, + params.recv_buff_size, + "recv"); + send_socket_buff_size = resize_udp_socket_buffer_with_warning( + [link](size_t size) { return link->resize_send_socket_buffer(size); }, + params.send_buff_size, + "send"); + + if (recv_socket_buff_size < params.num_recv_frames * MAX_ETHERNET_MTU) { + UHD_LOG_WARNING("UDP", + "The current recv_buff_size of " + << params.recv_buff_size + << " is less than the minimum recommended size of " + << params.num_recv_frames * MAX_ETHERNET_MTU + << " and may result in dropped packets on some NICs"); + } + if (send_socket_buff_size < params.num_send_frames * MAX_ETHERNET_MTU) { + UHD_LOG_WARNING("UDP", + "The current send_buff_size of " + << params.send_buff_size + << " is less than the minimum recommended size of " + << params.num_send_frames * MAX_ETHERNET_MTU + << " and may result in dropped packets on some NICs"); + } + + return link; +} diff --git a/host/lib/transport/udp_common.hpp b/host/lib/transport/udp_common.hpp deleted file mode 100644 index f320e3d85..000000000 --- a/host/lib/transport/udp_common.hpp +++ /dev/null @@ -1,71 +0,0 @@ -// -// Copyright 2011 Ettus Research LLC -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP -#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP - -#include <uhd/config.hpp> -#include <boost/asio.hpp> - -namespace uhd { namespace transport { - -// Jumbo frames can be up to 9600 bytes; -static const size_t MAX_ETHERNET_MTU = 9600; - -#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) -// MacOS limits socket buffer size to 1 Mib -static const size_t MAX_BUFF_SIZE_ETH_MACOS = 0x100000; // 1Mib -#endif - -typedef boost::shared_ptr<boost::asio::ip::udp::socket> socket_sptr; - -/*! - * Wait for the socket to become ready for a receive operation. - * \param sock_fd the open socket file descriptor - * \param timeout the timeout duration in seconds - * \return true when the socket is ready for receive - */ -UHD_INLINE bool wait_for_recv_ready(int sock_fd, double timeout) -{ -#ifdef UHD_PLATFORM_WIN32 // select is more portable than poll unfortunately - // setup timeval for timeout - timeval tv; - // If the tv_usec > 1 second on some platforms, select will - // error EINVAL: An invalid timeout interval was specified. - tv.tv_sec = int(timeout); - tv.tv_usec = int(timeout * 1000000) % 1000000; - - // setup rset for timeout - fd_set rset; - FD_ZERO(&rset); - FD_SET(sock_fd, &rset); - -// http://www.gnu.org/s/hello/manual/libc/Interrupted-Primitives.html -// This macro is provided with gcc to properly deal with EINTR. -// If not provided, define an empty macro, assume that is OK -# ifndef TEMP_FAILURE_RETRY -# define TEMP_FAILURE_RETRY(x) (x) -# endif - - // call select with timeout on receive socket - return TEMP_FAILURE_RETRY(::select(sock_fd + 1, &rset, NULL, NULL, &tv)) > 0; -#else - // calculate the total timeout in milliseconds (from seconds) - int total_timeout = int(timeout * 1000); - - pollfd pfd_read; - pfd_read.fd = sock_fd; - pfd_read.events = POLLIN; - - // call poll with timeout on receive socket - return ::poll(&pfd_read, 1, total_timeout) > 0; -#endif -} - -}} // namespace uhd::transport - -#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp index 48d7b500e..e10dff7f8 100644 --- a/host/lib/transport/udp_simple.cpp +++ b/host/lib/transport/udp_simple.cpp @@ -5,9 +5,9 @@ // SPDX-License-Identifier: GPL-3.0-or-later // -#include "udp_common.hpp" #include <uhd/transport/udp_simple.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_common.hpp> #include <boost/format.hpp> using namespace uhd::transport; @@ -53,7 +53,9 @@ public: size_t recv(const asio::mutable_buffer& buff, double timeout) { - if (not wait_for_recv_ready(_socket->native_handle(), timeout)) + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + + if (not wait_for_recv_ready(_socket->native_handle(), timeout_ms)) return 0; return _socket->receive_from(asio::buffer(buff), _recv_endpoint); } diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp index 8f83ea5ef..36837cda4 100644 --- a/host/lib/transport/udp_wsa_zero_copy.cpp +++ b/host/lib/transport/udp_wsa_zero_copy.cpp @@ -5,11 +5,11 @@ // SPDX-License-Identifier: GPL-3.0-or-later // -#include "udp_common.hpp" #include <uhd/transport/buffer_pool.hpp> #include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_common.hpp> #include <boost/format.hpp> #include <vector> diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index e3df80da6..44df9526d 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -5,11 +5,10 @@ // SPDX-License-Identifier: GPL-3.0-or-later // -#include "udp_common.hpp" #include <uhd/transport/buffer_pool.hpp> -#include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/utils/log.hpp> +#include <uhdlib/transport/udp_common.hpp> #include <uhdlib/utils/atomic.hpp> #include <boost/format.hpp> #include <boost/make_shared.hpp> @@ -19,12 +18,8 @@ using namespace uhd; using namespace uhd::transport; -namespace asio = boost::asio; -constexpr size_t UDP_ZERO_COPY_DEFAULT_NUM_FRAMES = 1; -constexpr size_t UDP_ZERO_COPY_DEFAULT_FRAME_SIZE = - 1472; // Based on common 1500 byte MTU for 1GbE. -constexpr size_t UDP_ZERO_COPY_DEFAULT_BUFF_SIZE = - 2500000; // 20ms of data for 1GbE link (in bytes) +namespace asio = boost::asio; + /*********************************************************************** * Check registry for correct fast-path setting (windows only) **********************************************************************/ @@ -80,22 +75,11 @@ public: if (not _claimer.claim_with_wait(timeout)) return sptr(); -#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported - _len = ::recv(_sock_fd, (char*)_mem, _frame_size, MSG_DONTWAIT); - if (_len > 0) { - index++; // advances the caller's buffer - return make(this, _mem, size_t(_len)); - } -#endif + const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000); + _len = recv_udp_packet(_sock_fd, _mem, _frame_size, timeout_ms); - if (wait_for_recv_ready(_sock_fd, timeout)) { - _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0); - if (_len == 0) - throw uhd::io_error("socket closed"); - if (_len < 0) - throw uhd::io_error( - str(boost::format("recv error on socket: %s") % strerror(errno))); - index++; // advances the caller's buffer + if (_len > 0) { + index++; return make(this, _mem, size_t(_len)); } @@ -125,23 +109,7 @@ public: void release(void) { - // Retry logic because send may fail with ENOBUFS. - // This is known to occur at least on some OSX systems. - // But it should be safe to always check for the error. - while (true) { - const ssize_t ret = ::send(_sock_fd, (const char*)_mem, size(), 0); - if (ret == ssize_t(size())) - break; - if (ret == -1 and errno == ENOBUFS) { - std::this_thread::sleep_for(std::chrono::microseconds(1)); - continue; // try to send again - } - if (ret == -1) { - throw uhd::io_error( - str(boost::format("send error on socket: %s") % strerror(errno))); - } - UHD_ASSERT_THROW(ret == ssize_t(size())); - } + send_udp_packet(_sock_fd, _mem, size()); _claimer.release(); } @@ -193,15 +161,7 @@ public: check_registry_for_fast_send_threshold(this->get_send_frame_size()); #endif /*CHECK_REG_SEND_THRESH*/ - // resolve the address - asio::ip::udp::resolver resolver(_io_service); - asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); - asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - - // create, open, and connect the socket - _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); - _socket->open(asio::ip::udp::v4()); - _socket->connect(receiver_endpoint); + _socket = open_udp_socket(addr, port, _io_service); _sock_fd = _socket->native_handle(); UHD_LOGGER_TRACE("UDP") << boost::format("Local UDP socket endpoint: %s:%s") @@ -220,24 +180,16 @@ public: } } - // get size for internal socket buffer - template <typename Opt> size_t get_buff_size(void) const + size_t resize_send_socket_buffer(size_t num_bytes) { - Opt option; - _socket->get_option(option); - return option.value(); + return resize_udp_socket_buffer<asio::socket_base::send_buffer_size>( + _socket, num_bytes); } - // set size for internal socket buffer - template <typename Opt> size_t resize_buff(size_t num_bytes) + size_t resize_recv_socket_buffer(size_t num_bytes) { -#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) - // limit buffer resize on macos or it will error - num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS); -#endif - Opt option(num_bytes); - _socket->set_option(option); - return get_buff_size<Opt>(); + return resize_udp_socket_buffer<asio::socket_base::receive_buffer_size>( + _socket, num_bytes); } /******************************************************************* @@ -308,37 +260,6 @@ private: /*********************************************************************** * UDP zero copy make function **********************************************************************/ -template <typename Opt> -static size_t resize_buff_helper(udp_zero_copy_asio_impl::sptr udp_trans, - const size_t target_size, - const std::string& name) -{ - size_t actual_size = 0; - std::string help_message; -#if defined(UHD_PLATFORM_LINUX) - help_message = str(boost::format("Please run: sudo sysctl -w net.core.%smem_max=%d") - % ((name == "recv") ? "r" : "w") % target_size); -#endif /*defined(UHD_PLATFORM_LINUX)*/ - - // resize the buffer if size was provided - if (target_size > 0) { - actual_size = udp_trans->resize_buff<Opt>(target_size); - UHD_LOGGER_TRACE("UDP") - << boost::format("Target/actual %s sock buff size: %d/%d bytes") % name - % target_size % actual_size; - if (actual_size < target_size) - UHD_LOGGER_WARNING("UDP") - << boost::format( - "The %s buffer could not be resized sufficiently.\n" - "Target sock buff size: %d bytes.\n" - "Actual sock buff size: %d bytes.\n" - "See the transport application notes on buffer resizing.\n%s") - % name % target_size % actual_size % help_message; - } - - return actual_size; -} - udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, const std::string& port, const zero_copy_xport_params& default_buff_args, @@ -362,26 +283,24 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, size_t(hints.cast<double>("send_buff_size", default_buff_args.send_buff_size)); if (xport_params.num_recv_frames == 0) { - UHD_LOG_TRACE("UDP", - "Default value for num_recv_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES); - xport_params.num_recv_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES; + UHD_LOG_TRACE( + "UDP", "Default value for num_recv_frames: " << UDP_DEFAULT_NUM_FRAMES); + xport_params.num_recv_frames = UDP_DEFAULT_NUM_FRAMES; } if (xport_params.num_send_frames == 0) { - UHD_LOG_TRACE("UDP", - "Default value for no num_send_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES); - xport_params.num_send_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES; + UHD_LOG_TRACE( + "UDP", "Default value for no num_send_frames: " << UDP_DEFAULT_NUM_FRAMES); + xport_params.num_send_frames = UDP_DEFAULT_NUM_FRAMES; } if (xport_params.recv_frame_size == 0) { UHD_LOG_TRACE("UDP", - "Using default value for recv_frame_size: " - << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE); - xport_params.recv_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE; + "Using default value for recv_frame_size: " << UDP_DEFAULT_FRAME_SIZE); + xport_params.recv_frame_size = UDP_DEFAULT_FRAME_SIZE; } if (xport_params.send_frame_size == 0) { - UHD_LOG_TRACE("UDP", - "Using default value for send_frame_size, " - << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE); - xport_params.send_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE; + UHD_LOG_TRACE( + "UDP", "Using default value for send_frame_size, " << UDP_DEFAULT_FRAME_SIZE); + xport_params.send_frame_size = UDP_DEFAULT_FRAME_SIZE; } UHD_LOG_TRACE("UDP", @@ -391,15 +310,15 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, if (xport_params.recv_buff_size == 0) { UHD_LOG_TRACE("UDP", "Using default value for recv_buff_size"); - xport_params.recv_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, - xport_params.num_recv_frames * MAX_ETHERNET_MTU); + xport_params.recv_buff_size = std::max( + UDP_DEFAULT_BUFF_SIZE, xport_params.num_recv_frames * MAX_ETHERNET_MTU); UHD_LOG_TRACE("UDP", "Using default value for recv_buff_size" << xport_params.recv_buff_size); } if (xport_params.send_buff_size == 0) { UHD_LOG_TRACE("UDP", "default_buff_args has no send_buff_size"); - xport_params.send_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, - xport_params.num_send_frames * MAX_ETHERNET_MTU); + xport_params.send_buff_size = std::max( + UDP_DEFAULT_BUFF_SIZE, xport_params.num_send_frames * MAX_ETHERNET_MTU); } #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) @@ -419,18 +338,20 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, new udp_zero_copy_asio_impl(addr, port, xport_params)); // call the helper to resize send and recv buffers - buff_params_out.recv_buff_size = - resize_buff_helper<asio::socket_base::receive_buffer_size>( - udp_trans, xport_params.recv_buff_size, "recv"); - buff_params_out.send_buff_size = - resize_buff_helper<asio::socket_base::send_buffer_size>( - udp_trans, xport_params.send_buff_size, "send"); + buff_params_out.recv_buff_size = resize_udp_socket_buffer_with_warning( + [udp_trans](size_t size) { return udp_trans->resize_recv_socket_buffer(size); }, + xport_params.recv_buff_size, + "recv"); + buff_params_out.send_buff_size = resize_udp_socket_buffer_with_warning( + [udp_trans](size_t size) { return udp_trans->resize_send_socket_buffer(size); }, + xport_params.send_buff_size, + "send"); if (buff_params_out.recv_buff_size < xport_params.num_recv_frames * MAX_ETHERNET_MTU) { UHD_LOG_WARNING("UDP", "The current recv_buff_size of " - << buff_params_out.recv_buff_size + << xport_params.recv_buff_size << " is less than the minimum recommended size of " << xport_params.num_recv_frames * MAX_ETHERNET_MTU << " and may result in dropped packets on some NICs"); @@ -439,7 +360,7 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, < xport_params.num_send_frames * MAX_ETHERNET_MTU) { UHD_LOG_WARNING("UDP", "The current send_buff_size of " - << buff_params_out.send_buff_size + << xport_params.send_buff_size << " is less than the minimum recommended size of " << xport_params.num_send_frames * MAX_ETHERNET_MTU << " and may result in dropped packets on some NICs"); |