aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2019-03-25 13:11:17 -0500
committerMartin Braun <martin.braun@ettus.com>2019-11-26 11:49:20 -0800
commit1e65500d791461be9aa7a2d2646d463f536f49e3 (patch)
treea603d8b56d41012e39f56531f017d0ebb5dedebd
parent6f3c201a802079a3d565c5f14e1222743097b459 (diff)
downloaduhd-1e65500d791461be9aa7a2d2646d463f536f49e3.tar.gz
uhd-1e65500d791461be9aa7a2d2646d463f536f49e3.tar.bz2
uhd-1e65500d791461be9aa7a2d2646d463f536f49e3.zip
uhd: add udp boost asio implementation of transport interface
-rw-r--r--host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp115
-rw-r--r--host/lib/include/uhdlib/transport/udp_common.hpp200
-rw-r--r--host/lib/transport/CMakeLists.txt2
-rw-r--r--host/lib/transport/tcp_zero_copy.cpp5
-rw-r--r--host/lib/transport/udp_boost_asio_link.cpp126
-rw-r--r--host/lib/transport/udp_common.hpp71
-rw-r--r--host/lib/transport/udp_simple.cpp6
-rw-r--r--host/lib/transport/udp_wsa_zero_copy.cpp2
-rw-r--r--host/lib/transport/udp_zero_copy.cpp159
9 files changed, 491 insertions, 195 deletions
diff --git a/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp b/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp
new file mode 100644
index 000000000..2e6f731c9
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/udp_boost_asio_link.hpp
@@ -0,0 +1,115 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP
+#define INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/types/device_addr.hpp>
+#include <uhdlib/transport/link_base.hpp>
+#include <uhdlib/transport/links.hpp>
+#include <uhdlib/transport/udp_common.hpp>
+#include <boost/asio.hpp>
+#include <memory>
+#include <vector>
+
+namespace uhd { namespace transport {
+
+class udp_boost_asio_frame_buff : public frame_buff
+{
+public:
+ udp_boost_asio_frame_buff(void* mem)
+ {
+ _data = mem;
+ }
+};
+
+class udp_boost_asio_link : public recv_link_base<udp_boost_asio_link>,
+ public send_link_base<udp_boost_asio_link>
+{
+public:
+ using sptr = std::shared_ptr<udp_boost_asio_link>;
+
+ /*!
+ * Make a new udp link.
+ *
+ * \param addr a string representing the destination address
+ * \param port a string representing the destination port
+ * \param params Values for frame sizes, num frames, and buffer sizes
+ * \param[out] recv_socket_buff_size Returns the recv socket buffer size
+ * \param[out] send_socket_buff_size Returns the send socket buffer size
+ */
+ static sptr make(const std::string& addr,
+ const std::string& port,
+ const link_params_t& params,
+ size_t& recv_socket_buff_size,
+ size_t& send_socket_buff_size);
+
+ /*! Return the local port of the UDP connection. Port is in host byte order.
+ *
+ * \returns Port number or 0 if port number couldn't be identified.
+ */
+ uint16_t get_local_port() const;
+
+ /*! Return the local IP address of the UDP connection as a dotted string.
+ *
+ * \returns IP address as a string or empty string if the IP address could
+ * not be identified.
+ */
+ std::string get_local_addr() const;
+
+private:
+ using recv_link_base_t = recv_link_base<udp_boost_asio_link>;
+ using send_link_base_t = send_link_base<udp_boost_asio_link>;
+
+ // Friend declarations to allow base classes to call private methods
+ friend recv_link_base_t;
+ friend send_link_base_t;
+
+ udp_boost_asio_link(
+ const std::string& addr, const std::string& port, const link_params_t& params);
+
+ size_t resize_recv_socket_buffer(size_t num_bytes);
+ size_t resize_send_socket_buffer(size_t num_bytes);
+
+ // Methods called by recv_link_base
+ UHD_FORCE_INLINE size_t get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms)
+ {
+ return recv_udp_packet(_sock_fd, buff.data(), get_recv_frame_size(), timeout_ms);
+ }
+
+ UHD_FORCE_INLINE void release_recv_buff_derived(frame_buff& /*buff*/)
+ {
+ // No-op
+ }
+
+ // Methods called by send_link_base
+ UHD_FORCE_INLINE bool get_send_buff_derived(
+ frame_buff& /*buff*/, int32_t /*timeout_ms*/)
+ {
+ return true;
+ }
+
+ UHD_FORCE_INLINE void release_send_buff_derived(frame_buff& buff)
+ {
+ send_udp_packet(_sock_fd, buff.data(), buff.packet_size());
+ }
+
+ buffer_pool::sptr _recv_memory_pool;
+ buffer_pool::sptr _send_memory_pool;
+
+ std::vector<udp_boost_asio_frame_buff> _recv_buffs;
+ std::vector<udp_boost_asio_frame_buff> _send_buffs;
+
+ boost::asio::io_service _io_service;
+ std::shared_ptr<boost::asio::ip::udp::socket> _socket;
+ int _sock_fd;
+};
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_UHD_TRANSPORT_UDP_BOOST_ASIO_LINK_HPP */
diff --git a/host/lib/include/uhdlib/transport/udp_common.hpp b/host/lib/include/uhdlib/transport/udp_common.hpp
new file mode 100644
index 000000000..5f5a18c27
--- /dev/null
+++ b/host/lib/include/uhdlib/transport/udp_common.hpp
@@ -0,0 +1,200 @@
+//
+// Copyright 2011 Ettus Research LLC
+// Copyright 2018 Ettus Research, a National Instruments Company
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#ifndef INCLUDED_TRANSPORT_UDP_COMMON_HPP
+#define INCLUDED_TRANSPORT_UDP_COMMON_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/asio.hpp>
+#include <boost/format.hpp>
+#include <thread>
+
+namespace uhd { namespace transport {
+
+// Jumbo frames can be up to 9600 bytes;
+constexpr size_t MAX_ETHERNET_MTU = 9600;
+
+constexpr size_t UDP_DEFAULT_NUM_FRAMES = 1;
+
+// Based on common 1500 byte MTU for 1GbE
+constexpr size_t UDP_DEFAULT_FRAME_SIZE = 1472;
+
+// 20ms of data for 1GbE link (in bytes)
+constexpr size_t UDP_DEFAULT_BUFF_SIZE = 2500000;
+
+
+#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
+// MacOS limits socket buffer size to 1 Mib
+static const size_t MAX_BUFF_SIZE_ETH_MACOS = 0x100000; // 1Mib
+#endif
+
+typedef std::shared_ptr<boost::asio::ip::udp::socket> socket_sptr;
+
+/*!
+ * Wait for the socket to become ready for a receive operation.
+ * \param sock_fd the open socket file descriptor
+ * \param timeout_ms the timeout duration in milliseconds
+ * \return true when the socket is ready for receive
+ */
+UHD_INLINE bool wait_for_recv_ready(int sock_fd, int32_t timeout_ms)
+{
+#ifdef UHD_PLATFORM_WIN32 // select is more portable than poll unfortunately
+ // setup timeval for timeout
+ timeval tv;
+ // If the tv_usec > 1 second on some platforms, select will
+ // error EINVAL: An invalid timeout interval was specified.
+ tv.tv_sec = int(timeout_ms / 1000);
+ tv.tv_usec = int(timeout_ms * 1000) % 1000000;
+
+ // setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(sock_fd, &rset);
+
+// http://www.gnu.org/s/hello/manual/libc/Interrupted-Primitives.html
+// This macro is provided with gcc to properly deal with EINTR.
+// If not provided, define an empty macro, assume that is OK
+# ifndef TEMP_FAILURE_RETRY
+# define TEMP_FAILURE_RETRY(x) (x)
+# endif
+
+ // call select with timeout on receive socket
+ return TEMP_FAILURE_RETRY(::select(sock_fd + 1, &rset, NULL, NULL, &tv)) > 0;
+#else
+ pollfd pfd_read;
+ pfd_read.fd = sock_fd;
+ pfd_read.events = POLLIN;
+
+ // call poll with timeout on receive socket
+ return ::poll(&pfd_read, 1, (int)timeout_ms) > 0;
+#endif
+}
+
+UHD_INLINE socket_sptr open_udp_socket(
+ const std::string& addr, const std::string& port, boost::asio::io_service& io_service)
+{
+ using udp = boost::asio::ip::udp;
+
+ // resolve the address
+ udp::resolver resolver(io_service);
+ udp::resolver::query query(udp::v4(), addr, port);
+ udp::endpoint receiver_endpoint = *resolver.resolve(query);
+
+ // create, open, and connect the socket
+ socket_sptr socket = socket_sptr(new udp::socket(io_service));
+ socket->open(udp::v4());
+ socket->connect(receiver_endpoint);
+
+ return socket;
+}
+
+UHD_INLINE size_t recv_udp_packet(
+ int sock_fd, void* mem, size_t frame_size, int32_t timeout_ms)
+{
+ ssize_t len;
+
+#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported
+ len = ::recv(sock_fd, (char*)mem, frame_size, MSG_DONTWAIT);
+ if (len > 0) {
+ return len;
+ }
+#endif
+
+ if (wait_for_recv_ready(sock_fd, timeout_ms)) {
+ len = ::recv(sock_fd, (char*)mem, frame_size, 0);
+ if (len == 0) {
+ throw uhd::io_error("socket closed");
+ }
+ if (len < 0) {
+ throw uhd::io_error(
+ str(boost::format("recv error on socket: %s") % strerror(errno)));
+ }
+ return len;
+ }
+
+ return 0; // timeout
+}
+
+UHD_INLINE void send_udp_packet(int sock_fd, void* mem, size_t len)
+{
+ // Retry logic because send may fail with ENOBUFS.
+ // This is known to occur at least on some OSX systems.
+ // But it should be safe to always check for the error.
+ while (true) {
+ const ssize_t ret = ::send(sock_fd, (const char*)mem, len, 0);
+ if (ret == ssize_t(len))
+ break;
+ if (ret == -1 and errno == ENOBUFS) {
+ std::this_thread::sleep_for(std::chrono::microseconds(1));
+ continue; // try to send again
+ }
+ if (ret == -1) {
+ throw uhd::io_error(
+ str(boost::format("send error on socket: %s") % strerror(errno)));
+ }
+ UHD_ASSERT_THROW(ret == ssize_t(len));
+ }
+}
+
+template <typename Opt>
+size_t get_udp_socket_buffer_size(socket_sptr socket)
+{
+ Opt option;
+ socket->get_option(option);
+ return option.value();
+}
+
+template <typename Opt>
+size_t resize_udp_socket_buffer(socket_sptr socket, size_t num_bytes)
+{
+#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
+ // limit buffer resize on macos or it will error
+ num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS);
+#endif
+ Opt option(num_bytes);
+ socket->set_option(option);
+ return get_udp_socket_buffer_size<Opt>(socket);
+}
+
+UHD_INLINE size_t resize_udp_socket_buffer_with_warning(
+ std::function<size_t(size_t)> resize_fn,
+ const size_t target_size,
+ const std::string& name)
+{
+ size_t actual_size = 0;
+ std::string help_message;
+#if defined(UHD_PLATFORM_LINUX)
+ help_message = str(boost::format("Please run: sudo sysctl -w net.core.%smem_max=%d")
+ % ((name == "recv") ? "r" : "w") % target_size);
+#endif /*defined(UHD_PLATFORM_LINUX)*/
+
+ // resize the buffer if size was provided
+ if (target_size > 0) {
+ actual_size = resize_fn(target_size);
+
+ UHD_LOGGER_TRACE("UDP")
+ << boost::format("Target/actual %s sock buff size: %d/%d bytes") % name
+ % target_size % actual_size;
+ if (actual_size < target_size)
+ UHD_LOGGER_WARNING("UDP")
+ << boost::format(
+ "The %s buffer could not be resized sufficiently.\n"
+ "Target sock buff size: %d bytes.\n"
+ "Actual sock buff size: %d bytes.\n"
+ "See the transport application notes on buffer resizing.\n%s")
+ % name % target_size % actual_size % help_message;
+ }
+
+ return actual_size;
+}
+
+
+}} // namespace uhd::transport
+
+#endif /* INCLUDED_TRANSPORT_UDP_COMMON_HPP */
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index a9663c89a..003beeee4 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -81,8 +81,10 @@ set_source_files_properties(
########################################################################
if(WIN32)
LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp)
+ LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_boost_asio_link.cpp)
else()
LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp)
+ LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_boost_asio_link.cpp)
endif()
#On windows, the boost asio implementation uses the winsock2 library.
diff --git a/host/lib/transport/tcp_zero_copy.cpp b/host/lib/transport/tcp_zero_copy.cpp
index 5cb713427..01bca900f 100644
--- a/host/lib/transport/tcp_zero_copy.cpp
+++ b/host/lib/transport/tcp_zero_copy.cpp
@@ -5,11 +5,11 @@
// SPDX-License-Identifier: GPL-3.0-or-later
//
-#include "udp_common.hpp"
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/transport/tcp_zero_copy.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/utils/atomic.hpp>
+#include <uhdlib/transport/udp_common.hpp>
#include <boost/format.hpp>
#include <boost/make_shared.hpp>
#include <chrono>
@@ -52,8 +52,9 @@ public:
return make(this, _mem, size_t(_len));
}
#endif
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
- if (wait_for_recv_ready(_sock_fd, timeout)) {
+ if (wait_for_recv_ready(_sock_fd, timeout_ms)) {
_len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0);
index++; // advances the caller's buffer
return make(this, _mem, size_t(_len));
diff --git a/host/lib/transport/udp_boost_asio_link.cpp b/host/lib/transport/udp_boost_asio_link.cpp
new file mode 100644
index 000000000..95d68ba91
--- /dev/null
+++ b/host/lib/transport/udp_boost_asio_link.cpp
@@ -0,0 +1,126 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/udp_boost_asio_link.hpp>
+#include <boost/format.hpp>
+
+using namespace uhd::transport;
+
+namespace asio = boost::asio;
+
+udp_boost_asio_link::udp_boost_asio_link(
+ const std::string& addr, const std::string& port, const link_params_t& params)
+ : recv_link_base_t(params.num_recv_frames, params.recv_frame_size)
+ , send_link_base_t(params.num_send_frames, params.send_frame_size)
+ , _recv_memory_pool(buffer_pool::make(params.num_recv_frames, params.recv_frame_size))
+ , _send_memory_pool(buffer_pool::make(params.num_send_frames, params.send_frame_size))
+{
+ for (size_t i = 0; i < params.num_recv_frames; i++) {
+ _recv_buffs.push_back(udp_boost_asio_frame_buff(_recv_memory_pool->at(i)));
+ }
+
+ for (size_t i = 0; i < params.num_send_frames; i++) {
+ _send_buffs.push_back(udp_boost_asio_frame_buff(_send_memory_pool->at(i)));
+ }
+
+ for (auto& buff : _recv_buffs) {
+ recv_link_base_t::preload_free_buff(&buff);
+ }
+
+ for (auto& buff : _send_buffs) {
+ send_link_base_t::preload_free_buff(&buff);
+ }
+
+ // create, open, and connect the socket
+ _socket = open_udp_socket(addr, port, _io_service);
+ _sock_fd = _socket->native_handle();
+
+ UHD_LOGGER_TRACE("UDP")
+ << boost::format("Created UDP link to %s:%s") % addr % port;
+ UHD_LOGGER_TRACE("UDP") << boost::format("Local UDP socket endpoint: %s:%s")
+ % get_local_addr() % get_local_port();
+}
+
+uint16_t udp_boost_asio_link::get_local_port() const
+{
+ return _socket->local_endpoint().port();
+}
+
+std::string udp_boost_asio_link::get_local_addr() const
+{
+ return _socket->local_endpoint().address().to_string();
+}
+
+size_t udp_boost_asio_link::resize_recv_socket_buffer(size_t num_bytes)
+{
+ return resize_udp_socket_buffer<asio::socket_base::receive_buffer_size>(
+ _socket, num_bytes);
+}
+
+size_t udp_boost_asio_link::resize_send_socket_buffer(size_t num_bytes)
+{
+ return resize_udp_socket_buffer<asio::socket_base::send_buffer_size>(
+ _socket, num_bytes);
+}
+
+udp_boost_asio_link::sptr udp_boost_asio_link::make(const std::string& addr,
+ const std::string& port,
+ const link_params_t& params,
+ size_t& recv_socket_buff_size,
+ size_t& send_socket_buff_size)
+{
+ UHD_ASSERT_THROW(params.num_recv_frames != 0);
+ UHD_ASSERT_THROW(params.num_send_frames != 0);
+ UHD_ASSERT_THROW(params.recv_frame_size != 0);
+ UHD_ASSERT_THROW(params.send_frame_size != 0);
+ UHD_ASSERT_THROW(params.recv_buff_size != 0);
+ UHD_ASSERT_THROW(params.send_buff_size != 0);
+
+#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
+ // limit buffer size on macos to avoid the warning issued by
+ // resize_buff_helper
+ if (params.recv_buff_size > MAX_BUFF_SIZE_ETH_MACOS) {
+ params.recv_buff_size = MAX_BUFF_SIZE_ETH_MACOS;
+ }
+ if (params.send_buff_size > MAX_BUFF_SIZE_ETH_MACOS) {
+ params.send_buff_size = MAX_BUFF_SIZE_ETH_MACOS;
+ }
+#endif
+
+ udp_boost_asio_link::sptr link(
+ new udp_boost_asio_link(addr, port, params));
+
+ // call the helper to resize send and recv buffers
+
+ recv_socket_buff_size = resize_udp_socket_buffer_with_warning(
+ [link](size_t size) { return link->resize_recv_socket_buffer(size); },
+ params.recv_buff_size,
+ "recv");
+ send_socket_buff_size = resize_udp_socket_buffer_with_warning(
+ [link](size_t size) { return link->resize_send_socket_buffer(size); },
+ params.send_buff_size,
+ "send");
+
+ if (recv_socket_buff_size < params.num_recv_frames * MAX_ETHERNET_MTU) {
+ UHD_LOG_WARNING("UDP",
+ "The current recv_buff_size of "
+ << params.recv_buff_size
+ << " is less than the minimum recommended size of "
+ << params.num_recv_frames * MAX_ETHERNET_MTU
+ << " and may result in dropped packets on some NICs");
+ }
+ if (send_socket_buff_size < params.num_send_frames * MAX_ETHERNET_MTU) {
+ UHD_LOG_WARNING("UDP",
+ "The current send_buff_size of "
+ << params.send_buff_size
+ << " is less than the minimum recommended size of "
+ << params.num_send_frames * MAX_ETHERNET_MTU
+ << " and may result in dropped packets on some NICs");
+ }
+
+ return link;
+}
diff --git a/host/lib/transport/udp_common.hpp b/host/lib/transport/udp_common.hpp
deleted file mode 100644
index f320e3d85..000000000
--- a/host/lib/transport/udp_common.hpp
+++ /dev/null
@@ -1,71 +0,0 @@
-//
-// Copyright 2011 Ettus Research LLC
-// Copyright 2018 Ettus Research, a National Instruments Company
-//
-// SPDX-License-Identifier: GPL-3.0-or-later
-//
-
-#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP
-#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP
-
-#include <uhd/config.hpp>
-#include <boost/asio.hpp>
-
-namespace uhd { namespace transport {
-
-// Jumbo frames can be up to 9600 bytes;
-static const size_t MAX_ETHERNET_MTU = 9600;
-
-#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
-// MacOS limits socket buffer size to 1 Mib
-static const size_t MAX_BUFF_SIZE_ETH_MACOS = 0x100000; // 1Mib
-#endif
-
-typedef boost::shared_ptr<boost::asio::ip::udp::socket> socket_sptr;
-
-/*!
- * Wait for the socket to become ready for a receive operation.
- * \param sock_fd the open socket file descriptor
- * \param timeout the timeout duration in seconds
- * \return true when the socket is ready for receive
- */
-UHD_INLINE bool wait_for_recv_ready(int sock_fd, double timeout)
-{
-#ifdef UHD_PLATFORM_WIN32 // select is more portable than poll unfortunately
- // setup timeval for timeout
- timeval tv;
- // If the tv_usec > 1 second on some platforms, select will
- // error EINVAL: An invalid timeout interval was specified.
- tv.tv_sec = int(timeout);
- tv.tv_usec = int(timeout * 1000000) % 1000000;
-
- // setup rset for timeout
- fd_set rset;
- FD_ZERO(&rset);
- FD_SET(sock_fd, &rset);
-
-// http://www.gnu.org/s/hello/manual/libc/Interrupted-Primitives.html
-// This macro is provided with gcc to properly deal with EINTR.
-// If not provided, define an empty macro, assume that is OK
-# ifndef TEMP_FAILURE_RETRY
-# define TEMP_FAILURE_RETRY(x) (x)
-# endif
-
- // call select with timeout on receive socket
- return TEMP_FAILURE_RETRY(::select(sock_fd + 1, &rset, NULL, NULL, &tv)) > 0;
-#else
- // calculate the total timeout in milliseconds (from seconds)
- int total_timeout = int(timeout * 1000);
-
- pollfd pfd_read;
- pfd_read.fd = sock_fd;
- pfd_read.events = POLLIN;
-
- // call poll with timeout on receive socket
- return ::poll(&pfd_read, 1, total_timeout) > 0;
-#endif
-}
-
-}} // namespace uhd::transport
-
-#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */
diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp
index 48d7b500e..e10dff7f8 100644
--- a/host/lib/transport/udp_simple.cpp
+++ b/host/lib/transport/udp_simple.cpp
@@ -5,9 +5,9 @@
// SPDX-License-Identifier: GPL-3.0-or-later
//
-#include "udp_common.hpp"
#include <uhd/transport/udp_simple.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/udp_common.hpp>
#include <boost/format.hpp>
using namespace uhd::transport;
@@ -53,7 +53,9 @@ public:
size_t recv(const asio::mutable_buffer& buff, double timeout)
{
- if (not wait_for_recv_ready(_socket->native_handle(), timeout))
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+
+ if (not wait_for_recv_ready(_socket->native_handle(), timeout_ms))
return 0;
return _socket->receive_from(asio::buffer(buff), _recv_endpoint);
}
diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp
index 8f83ea5ef..36837cda4 100644
--- a/host/lib/transport/udp_wsa_zero_copy.cpp
+++ b/host/lib/transport/udp_wsa_zero_copy.cpp
@@ -5,11 +5,11 @@
// SPDX-License-Identifier: GPL-3.0-or-later
//
-#include "udp_common.hpp"
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/transport/udp_simple.hpp> //mtu
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/udp_common.hpp>
#include <boost/format.hpp>
#include <vector>
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp
index e3df80da6..44df9526d 100644
--- a/host/lib/transport/udp_zero_copy.cpp
+++ b/host/lib/transport/udp_zero_copy.cpp
@@ -5,11 +5,10 @@
// SPDX-License-Identifier: GPL-3.0-or-later
//
-#include "udp_common.hpp"
#include <uhd/transport/buffer_pool.hpp>
-#include <uhd/transport/udp_simple.hpp> //mtu
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/utils/log.hpp>
+#include <uhdlib/transport/udp_common.hpp>
#include <uhdlib/utils/atomic.hpp>
#include <boost/format.hpp>
#include <boost/make_shared.hpp>
@@ -19,12 +18,8 @@
using namespace uhd;
using namespace uhd::transport;
-namespace asio = boost::asio;
-constexpr size_t UDP_ZERO_COPY_DEFAULT_NUM_FRAMES = 1;
-constexpr size_t UDP_ZERO_COPY_DEFAULT_FRAME_SIZE =
- 1472; // Based on common 1500 byte MTU for 1GbE.
-constexpr size_t UDP_ZERO_COPY_DEFAULT_BUFF_SIZE =
- 2500000; // 20ms of data for 1GbE link (in bytes)
+namespace asio = boost::asio;
+
/***********************************************************************
* Check registry for correct fast-path setting (windows only)
**********************************************************************/
@@ -80,22 +75,11 @@ public:
if (not _claimer.claim_with_wait(timeout))
return sptr();
-#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported
- _len = ::recv(_sock_fd, (char*)_mem, _frame_size, MSG_DONTWAIT);
- if (_len > 0) {
- index++; // advances the caller's buffer
- return make(this, _mem, size_t(_len));
- }
-#endif
+ const int32_t timeout_ms = static_cast<int32_t>(timeout * 1000);
+ _len = recv_udp_packet(_sock_fd, _mem, _frame_size, timeout_ms);
- if (wait_for_recv_ready(_sock_fd, timeout)) {
- _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0);
- if (_len == 0)
- throw uhd::io_error("socket closed");
- if (_len < 0)
- throw uhd::io_error(
- str(boost::format("recv error on socket: %s") % strerror(errno)));
- index++; // advances the caller's buffer
+ if (_len > 0) {
+ index++;
return make(this, _mem, size_t(_len));
}
@@ -125,23 +109,7 @@ public:
void release(void)
{
- // Retry logic because send may fail with ENOBUFS.
- // This is known to occur at least on some OSX systems.
- // But it should be safe to always check for the error.
- while (true) {
- const ssize_t ret = ::send(_sock_fd, (const char*)_mem, size(), 0);
- if (ret == ssize_t(size()))
- break;
- if (ret == -1 and errno == ENOBUFS) {
- std::this_thread::sleep_for(std::chrono::microseconds(1));
- continue; // try to send again
- }
- if (ret == -1) {
- throw uhd::io_error(
- str(boost::format("send error on socket: %s") % strerror(errno)));
- }
- UHD_ASSERT_THROW(ret == ssize_t(size()));
- }
+ send_udp_packet(_sock_fd, _mem, size());
_claimer.release();
}
@@ -193,15 +161,7 @@ public:
check_registry_for_fast_send_threshold(this->get_send_frame_size());
#endif /*CHECK_REG_SEND_THRESH*/
- // resolve the address
- asio::ip::udp::resolver resolver(_io_service);
- asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
- asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
-
- // create, open, and connect the socket
- _socket = socket_sptr(new asio::ip::udp::socket(_io_service));
- _socket->open(asio::ip::udp::v4());
- _socket->connect(receiver_endpoint);
+ _socket = open_udp_socket(addr, port, _io_service);
_sock_fd = _socket->native_handle();
UHD_LOGGER_TRACE("UDP") << boost::format("Local UDP socket endpoint: %s:%s")
@@ -220,24 +180,16 @@ public:
}
}
- // get size for internal socket buffer
- template <typename Opt> size_t get_buff_size(void) const
+ size_t resize_send_socket_buffer(size_t num_bytes)
{
- Opt option;
- _socket->get_option(option);
- return option.value();
+ return resize_udp_socket_buffer<asio::socket_base::send_buffer_size>(
+ _socket, num_bytes);
}
- // set size for internal socket buffer
- template <typename Opt> size_t resize_buff(size_t num_bytes)
+ size_t resize_recv_socket_buffer(size_t num_bytes)
{
-#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
- // limit buffer resize on macos or it will error
- num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS);
-#endif
- Opt option(num_bytes);
- _socket->set_option(option);
- return get_buff_size<Opt>();
+ return resize_udp_socket_buffer<asio::socket_base::receive_buffer_size>(
+ _socket, num_bytes);
}
/*******************************************************************
@@ -308,37 +260,6 @@ private:
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
-template <typename Opt>
-static size_t resize_buff_helper(udp_zero_copy_asio_impl::sptr udp_trans,
- const size_t target_size,
- const std::string& name)
-{
- size_t actual_size = 0;
- std::string help_message;
-#if defined(UHD_PLATFORM_LINUX)
- help_message = str(boost::format("Please run: sudo sysctl -w net.core.%smem_max=%d")
- % ((name == "recv") ? "r" : "w") % target_size);
-#endif /*defined(UHD_PLATFORM_LINUX)*/
-
- // resize the buffer if size was provided
- if (target_size > 0) {
- actual_size = udp_trans->resize_buff<Opt>(target_size);
- UHD_LOGGER_TRACE("UDP")
- << boost::format("Target/actual %s sock buff size: %d/%d bytes") % name
- % target_size % actual_size;
- if (actual_size < target_size)
- UHD_LOGGER_WARNING("UDP")
- << boost::format(
- "The %s buffer could not be resized sufficiently.\n"
- "Target sock buff size: %d bytes.\n"
- "Actual sock buff size: %d bytes.\n"
- "See the transport application notes on buffer resizing.\n%s")
- % name % target_size % actual_size % help_message;
- }
-
- return actual_size;
-}
-
udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,
const std::string& port,
const zero_copy_xport_params& default_buff_args,
@@ -362,26 +283,24 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,
size_t(hints.cast<double>("send_buff_size", default_buff_args.send_buff_size));
if (xport_params.num_recv_frames == 0) {
- UHD_LOG_TRACE("UDP",
- "Default value for num_recv_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES);
- xport_params.num_recv_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES;
+ UHD_LOG_TRACE(
+ "UDP", "Default value for num_recv_frames: " << UDP_DEFAULT_NUM_FRAMES);
+ xport_params.num_recv_frames = UDP_DEFAULT_NUM_FRAMES;
}
if (xport_params.num_send_frames == 0) {
- UHD_LOG_TRACE("UDP",
- "Default value for no num_send_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES);
- xport_params.num_send_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES;
+ UHD_LOG_TRACE(
+ "UDP", "Default value for no num_send_frames: " << UDP_DEFAULT_NUM_FRAMES);
+ xport_params.num_send_frames = UDP_DEFAULT_NUM_FRAMES;
}
if (xport_params.recv_frame_size == 0) {
UHD_LOG_TRACE("UDP",
- "Using default value for recv_frame_size: "
- << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE);
- xport_params.recv_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE;
+ "Using default value for recv_frame_size: " << UDP_DEFAULT_FRAME_SIZE);
+ xport_params.recv_frame_size = UDP_DEFAULT_FRAME_SIZE;
}
if (xport_params.send_frame_size == 0) {
- UHD_LOG_TRACE("UDP",
- "Using default value for send_frame_size, "
- << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE);
- xport_params.send_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE;
+ UHD_LOG_TRACE(
+ "UDP", "Using default value for send_frame_size, " << UDP_DEFAULT_FRAME_SIZE);
+ xport_params.send_frame_size = UDP_DEFAULT_FRAME_SIZE;
}
UHD_LOG_TRACE("UDP",
@@ -391,15 +310,15 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,
if (xport_params.recv_buff_size == 0) {
UHD_LOG_TRACE("UDP", "Using default value for recv_buff_size");
- xport_params.recv_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE,
- xport_params.num_recv_frames * MAX_ETHERNET_MTU);
+ xport_params.recv_buff_size = std::max(
+ UDP_DEFAULT_BUFF_SIZE, xport_params.num_recv_frames * MAX_ETHERNET_MTU);
UHD_LOG_TRACE("UDP",
"Using default value for recv_buff_size" << xport_params.recv_buff_size);
}
if (xport_params.send_buff_size == 0) {
UHD_LOG_TRACE("UDP", "default_buff_args has no send_buff_size");
- xport_params.send_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE,
- xport_params.num_send_frames * MAX_ETHERNET_MTU);
+ xport_params.send_buff_size = std::max(
+ UDP_DEFAULT_BUFF_SIZE, xport_params.num_send_frames * MAX_ETHERNET_MTU);
}
#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
@@ -419,18 +338,20 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,
new udp_zero_copy_asio_impl(addr, port, xport_params));
// call the helper to resize send and recv buffers
- buff_params_out.recv_buff_size =
- resize_buff_helper<asio::socket_base::receive_buffer_size>(
- udp_trans, xport_params.recv_buff_size, "recv");
- buff_params_out.send_buff_size =
- resize_buff_helper<asio::socket_base::send_buffer_size>(
- udp_trans, xport_params.send_buff_size, "send");
+ buff_params_out.recv_buff_size = resize_udp_socket_buffer_with_warning(
+ [udp_trans](size_t size) { return udp_trans->resize_recv_socket_buffer(size); },
+ xport_params.recv_buff_size,
+ "recv");
+ buff_params_out.send_buff_size = resize_udp_socket_buffer_with_warning(
+ [udp_trans](size_t size) { return udp_trans->resize_send_socket_buffer(size); },
+ xport_params.send_buff_size,
+ "send");
if (buff_params_out.recv_buff_size
< xport_params.num_recv_frames * MAX_ETHERNET_MTU) {
UHD_LOG_WARNING("UDP",
"The current recv_buff_size of "
- << buff_params_out.recv_buff_size
+ << xport_params.recv_buff_size
<< " is less than the minimum recommended size of "
<< xport_params.num_recv_frames * MAX_ETHERNET_MTU
<< " and may result in dropped packets on some NICs");
@@ -439,7 +360,7 @@ udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr,
< xport_params.num_send_frames * MAX_ETHERNET_MTU) {
UHD_LOG_WARNING("UDP",
"The current send_buff_size of "
- << buff_params_out.send_buff_size
+ << xport_params.send_buff_size
<< " is less than the minimum recommended size of "
<< xport_params.num_send_frames * MAX_ETHERNET_MTU
<< " and may result in dropped packets on some NICs");