diff options
-rw-r--r-- | host/lib/transport/udp_common.hpp | 4 | ||||
-rw-r--r-- | host/lib/transport/udp_simple.cpp | 4 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy.cpp | 24 |
3 files changed, 18 insertions, 14 deletions
diff --git a/host/lib/transport/udp_common.hpp b/host/lib/transport/udp_common.hpp index 44067b5dc..47775d9c4 100644 --- a/host/lib/transport/udp_common.hpp +++ b/host/lib/transport/udp_common.hpp @@ -23,13 +23,15 @@ namespace uhd{ namespace transport{ + typedef boost::shared_ptr<boost::asio::ip::udp::socket> socket_sptr; + /*! * Wait for the socket to become ready for a receive operation. * \param sock_fd the open socket file descriptor * \param timeout the timeout duration in seconds * \return true when the socket is ready for receive */ - UHD_INLINE bool wait_for_recv(int sock_fd, double timeout){ + UHD_INLINE bool wait_for_recv_ready(int sock_fd, double timeout){ //setup timeval for timeout timeval tv; //If the tv_usec > 1 second on some platforms, select will diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp index 094f570ff..1ee036d52 100644 --- a/host/lib/transport/udp_simple.cpp +++ b/host/lib/transport/udp_simple.cpp @@ -28,8 +28,6 @@ namespace asio = boost::asio; **********************************************************************/ class udp_simple_impl : public udp_simple{ public: - typedef boost::shared_ptr<asio::ip::udp::socket> socket_sptr; - udp_simple_impl( const std::string &addr, const std::string &port, bool bcast, bool connect ):_connected(connect){ @@ -58,7 +56,7 @@ public: } size_t recv(const asio::mutable_buffer &buff, double timeout){ - if (not wait_for_recv(_socket->native(), timeout)) return 0; + if (not wait_for_recv_ready(_socket->native(), timeout)) return 0; return _socket->receive(asio::buffer(buff)); } diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp index 793fc6fba..dda3bb547 100644 --- a/host/lib/transport/udp_zero_copy.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -139,7 +139,7 @@ public: asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); //create, open, and connect the socket - _socket = new asio::ip::udp::socket(_io_service); + _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); _sock_fd = _socket->native(); @@ -161,10 +161,6 @@ public: } } - ~udp_zero_copy_asio_impl(void){ - delete _socket; - } - //get size for internal socket buffer template <typename Opt> size_t get_buff_size(void) const{ Opt option; @@ -182,16 +178,24 @@ public: /******************************************************************* * Receive implementation: * - * Use select to perform a blocking receive with timeout. + * Perform a non-blocking receive for performance, + * and then fall back to a blocking receive with timeout. * Return the managed receive buffer with the new length. * When the caller is finished with the managed buffer, * the managed receive buffer is released back into the queue. ******************************************************************/ managed_recv_buffer::sptr get_recv_buff(double timeout){ udp_zero_copy_asio_mrb *mrb = NULL; - bool recv_ready = wait_for_recv(_sock_fd, timeout); - if (recv_ready and _pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ - return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0)); + if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ + + #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported + ssize_t ret = ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, MSG_DONTWAIT); + if (ret > 0) return mrb->get_new(ret); + #endif + + if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new( + ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0) + ); } return managed_recv_buffer::sptr(); } @@ -247,7 +251,7 @@ private: //asio guts -> socket and service asio::io_service _io_service; - asio::ip::udp::socket *_socket; + socket_sptr _socket; int _sock_fd; }; |