diff options
Diffstat (limited to 'host/lib/transport/udp_zero_copy.cpp')
-rw-r--r-- | host/lib/transport/udp_zero_copy.cpp | 439 |
1 files changed, 239 insertions, 200 deletions
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index 2b637639b..1ccee72a2 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -6,45 +6,52 @@ // #include "udp_common.hpp" -#include <uhd/transport/udp_zero_copy.hpp> -#include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/buffer_pool.hpp> - +#include <uhd/transport/udp_simple.hpp> //mtu +#include <uhd/transport/udp_zero_copy.hpp> #include <uhd/utils/log.hpp> #include <uhdlib/utils/atomic.hpp> #include <boost/format.hpp> #include <boost/make_shared.hpp> -#include <vector> #include <chrono> #include <thread> +#include <vector> using namespace uhd; using namespace uhd::transport; -namespace asio = boost::asio; +namespace asio = boost::asio; constexpr size_t UDP_ZERO_COPY_DEFAULT_NUM_FRAMES = 1; -constexpr size_t UDP_ZERO_COPY_DEFAULT_FRAME_SIZE = 1472; // Based on common 1500 byte MTU for 1GbE. -constexpr size_t UDP_ZERO_COPY_DEFAULT_BUFF_SIZE = 2500000; // 20ms of data for 1GbE link (in bytes) +constexpr size_t UDP_ZERO_COPY_DEFAULT_FRAME_SIZE = + 1472; // Based on common 1500 byte MTU for 1GbE. +constexpr size_t UDP_ZERO_COPY_DEFAULT_BUFF_SIZE = + 2500000; // 20ms of data for 1GbE link (in bytes) /*********************************************************************** * Check registry for correct fast-path setting (windows only) **********************************************************************/ #ifdef HAVE_ATLBASE_H -#define CHECK_REG_SEND_THRESH -#include <atlbase.h> //CRegKey -static void check_registry_for_fast_send_threshold(const size_t mtu){ +# define CHECK_REG_SEND_THRESH +# include <atlbase.h> //CRegKey +static void check_registry_for_fast_send_threshold(const size_t mtu) +{ static bool warned = false; - if (warned) return; //only allow one printed warning per process + if (warned) + return; // only allow one printed warning per process CRegKey reg_key; - DWORD threshold = 1024; //system default when threshold is not specified - if ( - reg_key.Open(HKEY_LOCAL_MACHINE, "System\\CurrentControlSet\\Services\\AFD\\Parameters", KEY_READ) != ERROR_SUCCESS or - reg_key.QueryDWORDValue("FastSendDatagramThreshold", threshold) != ERROR_SUCCESS or threshold < mtu - ){ - UHD_LOGGER_WARNING("UDP") << boost::format( - "The MTU (%d) is larger than the FastSendDatagramThreshold (%d)!\n" - "This will negatively affect the transmit performance.\n" - "See the transport application notes for more detail.\n" - ) % mtu % threshold ; + DWORD threshold = 1024; // system default when threshold is not specified + if (reg_key.Open(HKEY_LOCAL_MACHINE, + "System\\CurrentControlSet\\Services\\AFD\\Parameters", + KEY_READ) + != ERROR_SUCCESS + or reg_key.QueryDWORDValue("FastSendDatagramThreshold", threshold) + != ERROR_SUCCESS + or threshold < mtu) { + UHD_LOGGER_WARNING("UDP") + << boost::format( + "The MTU (%d) is larger than the FastSendDatagramThreshold (%d)!\n" + "This will negatively affect the transmit performance.\n" + "See the transport application notes for more detail.\n") + % mtu % threshold; warned = true; } reg_key.Close(); @@ -55,42 +62,49 @@ static void check_registry_for_fast_send_threshold(const size_t mtu){ * Reusable managed receiver buffer: * - get_new performs the recv operation **********************************************************************/ -class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +class udp_zero_copy_asio_mrb : public managed_recv_buffer +{ public: - udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size): - _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size), _len(0) { /*NOP*/ } + udp_zero_copy_asio_mrb(void* mem, int sock_fd, const size_t frame_size) + : _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size), _len(0) + { /*NOP*/ + } - void release(void){ + void release(void) + { _claimer.release(); } - UHD_INLINE sptr get_new(const double timeout, size_t &index){ - if (not _claimer.claim_with_wait(timeout)) return sptr(); + UHD_INLINE sptr get_new(const double timeout, size_t& index) + { + if (not _claimer.claim_with_wait(timeout)) + return sptr(); - #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported - _len = ::recv(_sock_fd, (char *)_mem, _frame_size, MSG_DONTWAIT); - if (_len > 0){ - index++; //advances the caller's buffer +#ifdef MSG_DONTWAIT // try a non-blocking recv() if supported + _len = ::recv(_sock_fd, (char*)_mem, _frame_size, MSG_DONTWAIT); + if (_len > 0) { + index++; // advances the caller's buffer return make(this, _mem, size_t(_len)); } - #endif +#endif - if (wait_for_recv_ready(_sock_fd, timeout)){ - _len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0); + if (wait_for_recv_ready(_sock_fd, timeout)) { + _len = ::recv(_sock_fd, (char*)_mem, _frame_size, 0); if (_len == 0) throw uhd::io_error("socket closed"); if (_len < 0) - throw uhd::io_error(str(boost::format("recv error on socket: %s") % strerror(errno))); - index++; //advances the caller's buffer + throw uhd::io_error( + str(boost::format("recv error on socket: %s") % strerror(errno))); + index++; // advances the caller's buffer return make(this, _mem, size_t(_len)); } - _claimer.release(); //undo claim - return sptr(); //null for timeout + _claimer.release(); // undo claim + return sptr(); // null for timeout } private: - void *_mem; + void* _mem; int _sock_fd; size_t _frame_size; ssize_t _len; @@ -101,41 +115,46 @@ private: * Reusable managed send buffer: * - commit performs the send operation **********************************************************************/ -class udp_zero_copy_asio_msb : public managed_send_buffer{ +class udp_zero_copy_asio_msb : public managed_send_buffer +{ public: - udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size): - _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ } - - void release(void){ - //Retry logic because send may fail with ENOBUFS. - //This is known to occur at least on some OSX systems. - //But it should be safe to always check for the error. - while (true) - { - const ssize_t ret = ::send(_sock_fd, (const char *)_mem, size(), 0); - if (ret == ssize_t(size())) break; - if (ret == -1 and errno == ENOBUFS) - { + udp_zero_copy_asio_msb(void* mem, int sock_fd, const size_t frame_size) + : _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) + { /*NOP*/ + } + + void release(void) + { + // Retry logic because send may fail with ENOBUFS. + // This is known to occur at least on some OSX systems. + // But it should be safe to always check for the error. + while (true) { + const ssize_t ret = ::send(_sock_fd, (const char*)_mem, size(), 0); + if (ret == ssize_t(size())) + break; + if (ret == -1 and errno == ENOBUFS) { std::this_thread::sleep_for(std::chrono::microseconds(1)); - continue; //try to send again + continue; // try to send again } - if (ret == -1) - { - throw uhd::io_error(str(boost::format("send error on socket: %s") % strerror(errno))); + if (ret == -1) { + throw uhd::io_error( + str(boost::format("send error on socket: %s") % strerror(errno))); } UHD_ASSERT_THROW(ret == ssize_t(size())); } _claimer.release(); } - UHD_INLINE sptr get_new(const double timeout, size_t &index){ - if (not _claimer.claim_with_wait(timeout)) return sptr(); - index++; //advances the caller's buffer + UHD_INLINE sptr get_new(const double timeout, size_t& index) + { + if (not _claimer.claim_with_wait(timeout)) + return sptr(); + index++; // advances the caller's buffer return make(this, _mem, _frame_size); } private: - void *_mem; + void* _mem; int _sock_fd; size_t _frame_size; simple_claimer _claimer; @@ -148,74 +167,74 @@ private: * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class udp_zero_copy_asio_impl : public udp_zero_copy{ +class udp_zero_copy_asio_impl : public udp_zero_copy +{ public: typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; - udp_zero_copy_asio_impl( - const std::string &addr, - const std::string &port, - const zero_copy_xport_params& xport_params - ): - _recv_frame_size(xport_params.recv_frame_size), - _num_recv_frames(xport_params.num_recv_frames), - _send_frame_size(xport_params.send_frame_size), - _num_send_frames(xport_params.num_send_frames), - _recv_buffer_pool(buffer_pool::make(xport_params.num_recv_frames, xport_params.recv_frame_size)), - _send_buffer_pool(buffer_pool::make(xport_params.num_send_frames, xport_params.send_frame_size)), - _next_recv_buff_index(0), _next_send_buff_index(0) + udp_zero_copy_asio_impl(const std::string& addr, + const std::string& port, + const zero_copy_xport_params& xport_params) + : _recv_frame_size(xport_params.recv_frame_size) + , _num_recv_frames(xport_params.num_recv_frames) + , _send_frame_size(xport_params.send_frame_size) + , _num_send_frames(xport_params.num_send_frames) + , _recv_buffer_pool(buffer_pool::make( + xport_params.num_recv_frames, xport_params.recv_frame_size)) + , _send_buffer_pool(buffer_pool::make( + xport_params.num_send_frames, xport_params.send_frame_size)) + , _next_recv_buff_index(0) + , _next_send_buff_index(0) { UHD_LOGGER_TRACE("UDP") << boost::format("Creating UDP transport to %s:%s") % addr % port; - #ifdef CHECK_REG_SEND_THRESH +#ifdef CHECK_REG_SEND_THRESH check_registry_for_fast_send_threshold(this->get_send_frame_size()); - #endif /*CHECK_REG_SEND_THRESH*/ +#endif /*CHECK_REG_SEND_THRESH*/ - //resolve the address + // resolve the address asio::ip::udp::resolver resolver(_io_service); asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - //create, open, and connect the socket + // create, open, and connect the socket _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); _sock_fd = _socket->native_handle(); - UHD_LOGGER_TRACE("UDP") - << boost::format("Local UDP socket endpoint: %s:%s") - % get_local_addr() % get_local_port(); + UHD_LOGGER_TRACE("UDP") << boost::format("Local UDP socket endpoint: %s:%s") + % get_local_addr() % get_local_port(); - //allocate re-usable managed receive buffers - for (size_t i = 0; i < get_num_recv_frames(); i++){ + // allocate re-usable managed receive buffers + for (size_t i = 0; i < get_num_recv_frames(); i++) { _mrb_pool.push_back(boost::make_shared<udp_zero_copy_asio_mrb>( - _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size() - )); + _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size())); } - //allocate re-usable managed send buffers - for (size_t i = 0; i < get_num_send_frames(); i++){ + // allocate re-usable managed send buffers + for (size_t i = 0; i < get_num_send_frames(); i++) { _msb_pool.push_back(boost::make_shared<udp_zero_copy_asio_msb>( - _send_buffer_pool->at(i), _sock_fd, get_send_frame_size() - )); + _send_buffer_pool->at(i), _sock_fd, get_send_frame_size())); } } - //get size for internal socket buffer - template <typename Opt> size_t get_buff_size(void) const{ + // get size for internal socket buffer + template <typename Opt> size_t get_buff_size(void) const + { Opt option; _socket->get_option(option); return option.value(); } - //set size for internal socket buffer + // set size for internal socket buffer template <typename Opt> size_t resize_buff(size_t num_bytes) { - #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) - //limit buffer resize on macos or it will error - num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS); - #endif +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) + // limit buffer resize on macos or it will error + num_bytes = std::min(num_bytes, MAX_BUFF_SIZE_ETH_MACOS); +#endif Opt option(num_bytes); _socket->set_option(option); return get_buff_size<Opt>(); @@ -225,25 +244,41 @@ public: * Receive implementation: * Block on the managed buffer's get call and advance the index. ******************************************************************/ - managed_recv_buffer::sptr get_recv_buff(double timeout){ - if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0; + managed_recv_buffer::sptr get_recv_buff(double timeout) + { + if (_next_recv_buff_index == _num_recv_frames) + _next_recv_buff_index = 0; return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); } - size_t get_num_recv_frames(void) const {return _num_recv_frames;} - size_t get_recv_frame_size(void) const {return _recv_frame_size;} + size_t get_num_recv_frames(void) const + { + return _num_recv_frames; + } + size_t get_recv_frame_size(void) const + { + return _recv_frame_size; + } /******************************************************************* * Send implementation: * Block on the managed buffer's get call and advance the index. ******************************************************************/ - managed_send_buffer::sptr get_send_buff(double timeout){ - if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; + managed_send_buffer::sptr get_send_buff(double timeout) + { + if (_next_send_buff_index == _num_send_frames) + _next_send_buff_index = 0; return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); } - size_t get_num_send_frames(void) const {return _num_send_frames;} - size_t get_send_frame_size(void) const {return _send_frame_size;} + size_t get_num_send_frames(void) const + { + return _num_send_frames; + } + size_t get_send_frame_size(void) const + { + return _send_frame_size; + } uint16_t get_local_port(void) const { @@ -256,149 +291,153 @@ public: } private: - //memory management -> buffers and fifos + // memory management -> buffers and fifos const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; - std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool; - std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool; + std::vector<boost::shared_ptr<udp_zero_copy_asio_msb>> _msb_pool; + std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb>> _mrb_pool; size_t _next_recv_buff_index, _next_send_buff_index; - //asio guts -> socket and service - asio::io_service _io_service; - socket_sptr _socket; - int _sock_fd; + // asio guts -> socket and service + asio::io_service _io_service; + socket_sptr _socket; + int _sock_fd; }; /*********************************************************************** * UDP zero copy make function **********************************************************************/ -template<typename Opt> static size_t resize_buff_helper( - udp_zero_copy_asio_impl::sptr udp_trans, +template <typename Opt> +static size_t resize_buff_helper(udp_zero_copy_asio_impl::sptr udp_trans, const size_t target_size, - const std::string &name -){ + const std::string& name) +{ size_t actual_size = 0; std::string help_message; - #if defined(UHD_PLATFORM_LINUX) - help_message = str(boost::format( - "Please run: sudo sysctl -w net.core.%smem_max=%d" - ) % ((name == "recv")?"r":"w") % target_size); - #endif /*defined(UHD_PLATFORM_LINUX)*/ - - //resize the buffer if size was provided - if (target_size > 0){ +#if defined(UHD_PLATFORM_LINUX) + help_message = str(boost::format("Please run: sudo sysctl -w net.core.%smem_max=%d") + % ((name == "recv") ? "r" : "w") % target_size); +#endif /*defined(UHD_PLATFORM_LINUX)*/ + + // resize the buffer if size was provided + if (target_size > 0) { actual_size = udp_trans->resize_buff<Opt>(target_size); UHD_LOGGER_TRACE("UDP") - << boost::format("Target/actual %s sock buff size: %d/%d bytes") - % name - % target_size - % actual_size - ; - if (actual_size < target_size) UHD_LOGGER_WARNING("UDP") << boost::format( - "The %s buffer could not be resized sufficiently.\n" - "Target sock buff size: %d bytes.\n" - "Actual sock buff size: %d bytes.\n" - "See the transport application notes on buffer resizing.\n%s" - ) % name % target_size % actual_size % help_message; + << boost::format("Target/actual %s sock buff size: %d/%d bytes") % name + % target_size % actual_size; + if (actual_size < target_size) + UHD_LOGGER_WARNING("UDP") + << boost::format( + "The %s buffer could not be resized sufficiently.\n" + "Target sock buff size: %d bytes.\n" + "Actual sock buff size: %d bytes.\n" + "See the transport application notes on buffer resizing.\n%s") + % name % target_size % actual_size % help_message; } return actual_size; } -udp_zero_copy::sptr udp_zero_copy::make( - const std::string &addr, - const std::string &port, - const zero_copy_xport_params &default_buff_args, +udp_zero_copy::sptr udp_zero_copy::make(const std::string& addr, + const std::string& port, + const zero_copy_xport_params& default_buff_args, udp_zero_copy::buff_params& buff_params_out, - const device_addr_t &hints -){ - //Initialize xport_params + const device_addr_t& hints) +{ + // Initialize xport_params zero_copy_xport_params xport_params = default_buff_args; - xport_params.recv_frame_size = size_t(hints.cast<double>("recv_frame_size", default_buff_args.recv_frame_size)); - xport_params.num_recv_frames = size_t(hints.cast<double>("num_recv_frames", default_buff_args.num_recv_frames)); - xport_params.send_frame_size = size_t(hints.cast<double>("send_frame_size", default_buff_args.send_frame_size)); - xport_params.num_send_frames = size_t(hints.cast<double>("num_send_frames", default_buff_args.num_send_frames)); - xport_params.recv_buff_size = size_t(hints.cast<double>("recv_buff_size", default_buff_args.recv_buff_size)); - xport_params.send_buff_size = size_t(hints.cast<double>("send_buff_size", default_buff_args.send_buff_size)); + xport_params.recv_frame_size = + size_t(hints.cast<double>("recv_frame_size", default_buff_args.recv_frame_size)); + xport_params.num_recv_frames = + size_t(hints.cast<double>("num_recv_frames", default_buff_args.num_recv_frames)); + xport_params.send_frame_size = + size_t(hints.cast<double>("send_frame_size", default_buff_args.send_frame_size)); + xport_params.num_send_frames = + size_t(hints.cast<double>("num_send_frames", default_buff_args.num_send_frames)); + xport_params.recv_buff_size = + size_t(hints.cast<double>("recv_buff_size", default_buff_args.recv_buff_size)); + xport_params.send_buff_size = + size_t(hints.cast<double>("send_buff_size", default_buff_args.send_buff_size)); if (xport_params.num_recv_frames == 0) { - UHD_LOG_TRACE("UDP", "Default value for num_recv_frames: " - << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES - ); + UHD_LOG_TRACE("UDP", + "Default value for num_recv_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES); xport_params.num_recv_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES; } if (xport_params.num_send_frames == 0) { - UHD_LOG_TRACE("UDP", "Default value for no num_send_frames: " - << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES - ); + UHD_LOG_TRACE("UDP", + "Default value for no num_send_frames: " << UDP_ZERO_COPY_DEFAULT_NUM_FRAMES); xport_params.num_send_frames = UDP_ZERO_COPY_DEFAULT_NUM_FRAMES; } if (xport_params.recv_frame_size == 0) { - UHD_LOG_TRACE("UDP", "Using default value for recv_frame_size: " - << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE - ); + UHD_LOG_TRACE("UDP", + "Using default value for recv_frame_size: " + << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE); xport_params.recv_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE; } if (xport_params.send_frame_size == 0) { - UHD_LOG_TRACE("UDP", "Using default value for send_frame_size, " - << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE - ); + UHD_LOG_TRACE("UDP", + "Using default value for send_frame_size, " + << UDP_ZERO_COPY_DEFAULT_FRAME_SIZE); xport_params.send_frame_size = UDP_ZERO_COPY_DEFAULT_FRAME_SIZE; } if (xport_params.recv_buff_size == 0) { UHD_LOG_TRACE("UDP", "Using default value for recv_buff_size"); - xport_params.recv_buff_size = std::max( - UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, - xport_params.num_recv_frames*MAX_ETHERNET_MTU - ); - UHD_LOG_TRACE("UDP", "Using default value for recv_buff_size" - << xport_params.recv_buff_size - ); + xport_params.recv_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, + xport_params.num_recv_frames * MAX_ETHERNET_MTU); + UHD_LOG_TRACE("UDP", + "Using default value for recv_buff_size" << xport_params.recv_buff_size); } if (xport_params.send_buff_size == 0) { UHD_LOG_TRACE("UDP", "default_buff_args has no send_buff_size"); - xport_params.send_buff_size = std::max( - UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, - xport_params.num_send_frames*MAX_ETHERNET_MTU - ); + xport_params.send_buff_size = std::max(UDP_ZERO_COPY_DEFAULT_BUFF_SIZE, + xport_params.num_send_frames * MAX_ETHERNET_MTU); } - #if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) - //limit default buffer size on macos to avoid the warning issued by resize_buff_helper - if (not hints.has_key("recv_buff_size") and xport_params.recv_buff_size > MAX_BUFF_SIZE_ETH_MACOS) - { - xport_params.recv_buff_size = MAX_BUFF_SIZE_ETH_MACOS; - } - if (not hints.has_key("send_buff_size") and xport_params.send_buff_size > MAX_BUFF_SIZE_ETH_MACOS) - { - xport_params.send_buff_size = MAX_BUFF_SIZE_ETH_MACOS; - } - #endif +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) + // limit default buffer size on macos to avoid the warning issued by + // resize_buff_helper + if (not hints.has_key("recv_buff_size") + and xport_params.recv_buff_size > MAX_BUFF_SIZE_ETH_MACOS) { + xport_params.recv_buff_size = MAX_BUFF_SIZE_ETH_MACOS; + } + if (not hints.has_key("send_buff_size") + and xport_params.send_buff_size > MAX_BUFF_SIZE_ETH_MACOS) { + xport_params.send_buff_size = MAX_BUFF_SIZE_ETH_MACOS; + } +#endif udp_zero_copy_asio_impl::sptr udp_trans( - new udp_zero_copy_asio_impl(addr, port, xport_params) - ); - - //call the helper to resize send and recv buffers - buff_params_out.recv_buff_size = resize_buff_helper<asio::socket_base::receive_buffer_size>( - udp_trans, xport_params.recv_buff_size, "recv"); - buff_params_out.send_buff_size = resize_buff_helper<asio::socket_base::send_buffer_size> ( - udp_trans, xport_params.send_buff_size, "send"); - - if (buff_params_out.recv_buff_size < xport_params.num_recv_frames * MAX_ETHERNET_MTU){ - UHD_LOG_WARNING("UDP", "The current recv_buff_size of " << xport_params.recv_buff_size - << " is less than the minimum recommended size of " - << xport_params.num_recv_frames * MAX_ETHERNET_MTU - << " and may result in dropped packets on some NICs"); + new udp_zero_copy_asio_impl(addr, port, xport_params)); + + // call the helper to resize send and recv buffers + buff_params_out.recv_buff_size = + resize_buff_helper<asio::socket_base::receive_buffer_size>( + udp_trans, xport_params.recv_buff_size, "recv"); + buff_params_out.send_buff_size = + resize_buff_helper<asio::socket_base::send_buffer_size>( + udp_trans, xport_params.send_buff_size, "send"); + + if (buff_params_out.recv_buff_size + < xport_params.num_recv_frames * MAX_ETHERNET_MTU) { + UHD_LOG_WARNING("UDP", + "The current recv_buff_size of " + << xport_params.recv_buff_size + << " is less than the minimum recommended size of " + << xport_params.num_recv_frames * MAX_ETHERNET_MTU + << " and may result in dropped packets on some NICs"); } - if (buff_params_out.send_buff_size < xport_params.num_send_frames * MAX_ETHERNET_MTU){ - UHD_LOG_WARNING("UDP", "The current send_buff_size of " << xport_params.send_buff_size - << " is less than the minimum recommended size of " - << xport_params.num_send_frames * MAX_ETHERNET_MTU - << " and may result in dropped packets on some NICs"); + if (buff_params_out.send_buff_size + < xport_params.num_send_frames * MAX_ETHERNET_MTU) { + UHD_LOG_WARNING("UDP", + "The current send_buff_size of " + << xport_params.send_buff_size + << " is less than the minimum recommended size of " + << xport_params.num_send_frames * MAX_ETHERNET_MTU + << " and may result in dropped packets on some NICs"); } return udp_trans; |