diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 27 | ||||
-rw-r--r-- | host/lib/transport/if_addrs.cpp | 16 | ||||
-rw-r--r-- | host/lib/transport/udp_common.hpp | 53 | ||||
-rw-r--r-- | host/lib/transport/udp_simple.cpp | 160 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy.cpp (renamed from host/lib/transport/udp_zero_copy_asio.cpp) | 41 |
5 files changed, 130 insertions, 167 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index a98bcc14e..a5bf9c5f1 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -48,20 +48,31 @@ ENDIF(ENABLE_USB) ######################################################################## MESSAGE(STATUS "") MESSAGE(STATUS "Configuring interface address discovery...") - +INCLUDE(CheckCXXSourceCompiles) INCLUDE(CheckIncludeFileCXX) -CHECK_INCLUDE_FILE_CXX(ifaddrs.h HAVE_IFADDRS_H) + +CHECK_CXX_SOURCE_COMPILES(" + #include <ifaddrs.h> + int main(){ + struct ifaddrs *ifap; + getifaddrs(&ifap); + return 0; + } + " HAVE_GETIFADDRS +) + CHECK_INCLUDE_FILE_CXX(winsock2.h HAVE_WINSOCK2_H) -IF(HAVE_IFADDRS_H) +IF(HAVE_GETIFADDRS) MESSAGE(STATUS " Interface address discovery supported through getifaddrs.") - SET(IF_ADDRS_DEFS HAVE_IFADDRS_H) + SET(IF_ADDRS_DEFS HAVE_GETIFADDRS) ELSEIF(HAVE_WINSOCK2_H) MESSAGE(STATUS " Interface address discovery supported through SIO_GET_INTERFACE_LIST.") - SET(IF_ADDRS_DEFS HAVE_WINSOCK2_H) -ELSE(HAVE_IFADDRS_H) + SET(IF_ADDRS_DEFS HAVE_SIO_GET_INTERFACE_LIST) +ELSE() MESSAGE(STATUS " Interface address discovery not supported.") -ENDIF(HAVE_IFADDRS_H) + SET(IF_ADDRS_DEFS HAVE_IF_ADDRS_DUMMY) +ENDIF() SET_SOURCE_FILES_PROPERTIES( ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp @@ -80,6 +91,6 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy_asio.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/vrt_packet_handler.hpp ) diff --git a/host/lib/transport/if_addrs.cpp b/host/lib/transport/if_addrs.cpp index 17cf8455b..b7c8ad844 100644 --- a/host/lib/transport/if_addrs.cpp +++ b/host/lib/transport/if_addrs.cpp @@ -20,14 +20,10 @@ #include <boost/cstdint.hpp> #include <iostream> -uhd::transport::if_addrs_t::if_addrs_t(void){ - /* NOP */ -} - /*********************************************************************** * Interface address discovery through ifaddrs api **********************************************************************/ -#if defined(HAVE_IFADDRS_H) +#ifdef HAVE_GETIFADDRS #include <ifaddrs.h> static boost::asio::ip::address_v4 sockaddr_to_ip_addr(sockaddr *addr){ @@ -59,10 +55,12 @@ std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){ return if_addrs; } +#endif /* HAVE_GETIFADDRS */ + /*********************************************************************** * Interface address discovery through windows api **********************************************************************/ -#elif defined(HAVE_WINSOCK2_H) +#ifdef HAVE_SIO_GET_INTERFACE_LIST #include <winsock2.h> std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){ @@ -98,13 +96,15 @@ std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){ return if_addrs; } +#endif /* HAVE_SIO_GET_INTERFACE_LIST */ + /*********************************************************************** * Interface address discovery not included **********************************************************************/ -#else /* HAVE_IFADDRS_H */ +#ifdef HAVE_IF_ADDRS_DUMMY std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){ return std::vector<if_addrs_t>(); } -#endif /* HAVE_IFADDRS_H */ +#endif /* HAVE_IF_ADDRS_DUMMY */ diff --git a/host/lib/transport/udp_common.hpp b/host/lib/transport/udp_common.hpp new file mode 100644 index 000000000..47775d9c4 --- /dev/null +++ b/host/lib/transport/udp_common.hpp @@ -0,0 +1,53 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <boost/asio.hpp> + +namespace uhd{ namespace transport{ + + typedef boost::shared_ptr<boost::asio::ip::udp::socket> socket_sptr; + + /*! + * Wait for the socket to become ready for a receive operation. + * \param sock_fd the open socket file descriptor + * \param timeout the timeout duration in seconds + * \return true when the socket is ready for receive + */ + UHD_INLINE bool wait_for_recv_ready(int sock_fd, double timeout){ + //setup timeval for timeout + timeval tv; + //If the tv_usec > 1 second on some platforms, select will + //error EINVAL: An invalid timeout interval was specified. + tv.tv_sec = int(timeout); + tv.tv_usec = int(timeout*1000000)%1000000; + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(sock_fd, &rset); + + //call select with timeout on receive socket + return ::select(sock_fd+1, &rset, NULL, NULL, &tv) > 0; + } + +}} //namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp index 6799ac7b2..1ee036d52 100644 --- a/host/lib/transport/udp_simple.cpp +++ b/host/lib/transport/udp_simple.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -15,159 +15,69 @@ // along with this program. If not, see <http://www.gnu.org/licenses/>. // +#include "udp_common.hpp" #include <uhd/transport/udp_simple.hpp> -#include <boost/asio.hpp> -#include <boost/thread.hpp> #include <boost/format.hpp> #include <iostream> using namespace uhd::transport; +namespace asio = boost::asio; /*********************************************************************** - * Helper Functions + * UDP simple implementation: connected and broadcast **********************************************************************/ -/*! - * Wait for available data or timeout. - * \param socket the asio socket - * \param timeout the timeout in seconds - * \return false for timeout, true for data - */ -static bool wait_available( - boost::asio::ip::udp::socket &socket, double timeout -){ - #if defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) - - //setup timeval for timeout - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = long(timeout*1e6); - - //setup rset for timeout - fd_set rset; - FD_ZERO(&rset); - FD_SET(socket.native(), &rset); - - return ::select(socket.native()+1, &rset, NULL, NULL, &tv) > 0; - - #else /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ - - //FIXME: why does select fail on macintosh? - for (size_t i = 0; i < size_t(timeout*1e3); i++){ - if (socket.available()) return true; - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - } - return false; - - #endif /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ -} - -/*********************************************************************** - * UDP connected implementation class - **********************************************************************/ -class udp_connected_impl : public udp_simple{ +class udp_simple_impl : public udp_simple{ public: - //structors - udp_connected_impl(const std::string &addr, const std::string &port); - ~udp_connected_impl(void); - - //send/recv - size_t send(const boost::asio::const_buffer &); - size_t recv(const boost::asio::mutable_buffer &, double); - -private: - boost::asio::ip::udp::socket *_socket; - boost::asio::io_service _io_service; -}; + udp_simple_impl( + const std::string &addr, const std::string &port, bool bcast, bool connect + ):_connected(connect){ + //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; -udp_connected_impl::udp_connected_impl(const std::string &addr, const std::string &port){ - //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; + //resolve the address + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); + _receiver_endpoint = *resolver.resolve(query); - // resolve the address - boost::asio::ip::udp::resolver resolver(_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); - boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + //create and open the socket + _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); + _socket->open(asio::ip::udp::v4()); - // Create, open, and connect the socket - _socket = new boost::asio::ip::udp::socket(_io_service); - _socket->open(boost::asio::ip::udp::v4()); - _socket->connect(receiver_endpoint); -} + //allow broadcasting + _socket->set_option(asio::socket_base::broadcast(bcast)); -udp_connected_impl::~udp_connected_impl(void){ - delete _socket; -} - -size_t udp_connected_impl::send(const boost::asio::const_buffer &buff){ - return _socket->send(boost::asio::buffer(buff)); -} + //connect the socket + if (connect) _socket->connect(_receiver_endpoint); -size_t udp_connected_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){ - if (not wait_available(*_socket, timeout)) return 0; - return _socket->receive(boost::asio::buffer(buff)); -} + } -/*********************************************************************** - * UDP broadcast implementation class - **********************************************************************/ -class udp_broadcast_impl : public udp_simple{ -public: - //structors - udp_broadcast_impl(const std::string &addr, const std::string &port); - ~udp_broadcast_impl(void); + size_t send(const asio::const_buffer &buff){ + if (_connected) return _socket->send(asio::buffer(buff)); + return _socket->send_to(asio::buffer(buff), _receiver_endpoint); + } - //send/recv - size_t send(const boost::asio::const_buffer &); - size_t recv(const boost::asio::mutable_buffer &, double); + size_t recv(const asio::mutable_buffer &buff, double timeout){ + if (not wait_for_recv_ready(_socket->native(), timeout)) return 0; + return _socket->receive(asio::buffer(buff)); + } private: - boost::asio::ip::udp::socket *_socket; - boost::asio::ip::udp::endpoint _receiver_endpoint; - boost::asio::io_service _io_service; + bool _connected; + asio::io_service _io_service; + socket_sptr _socket; + asio::ip::udp::endpoint _receiver_endpoint; }; -udp_broadcast_impl::udp_broadcast_impl(const std::string &addr, const std::string &port){ - //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; - - // resolve the address - boost::asio::ip::udp::resolver resolver(_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); - _receiver_endpoint = *resolver.resolve(query); - - // Create and open the socket - _socket = new boost::asio::ip::udp::socket(_io_service); - _socket->open(boost::asio::ip::udp::v4()); - - // Allow broadcasting - boost::asio::socket_base::broadcast option(true); - _socket->set_option(option); - -} - -udp_broadcast_impl::~udp_broadcast_impl(void){ - delete _socket; -} - -size_t udp_broadcast_impl::send(const boost::asio::const_buffer &buff){ - return _socket->send_to(boost::asio::buffer(buff), _receiver_endpoint); -} - -size_t udp_broadcast_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){ - if (not wait_available(*_socket, timeout)) return 0; - boost::asio::ip::udp::endpoint sender_endpoint; - return _socket->receive_from(boost::asio::buffer(buff), sender_endpoint); -} - /*********************************************************************** * UDP public make functions **********************************************************************/ udp_simple::sptr udp_simple::make_connected( const std::string &addr, const std::string &port ){ - return sptr(new udp_connected_impl(addr, port)); + return sptr(new udp_simple_impl(addr, port, false, true /* no bcast, connect */)); } udp_simple::sptr udp_simple::make_broadcast( const std::string &addr, const std::string &port ){ - return sptr(new udp_broadcast_impl(addr, port)); + return sptr(new udp_simple_impl(addr, port, true, false /* bcast, no connect */)); } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy.cpp index 05352ffce..dda3bb547 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy.cpp @@ -15,13 +15,12 @@ // along with this program. If not, see <http://www.gnu.org/licenses/>. // +#include "udp_common.hpp" #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/assert.hpp> #include <uhd/utils/warning.hpp> -#include <boost/asio.hpp> #include <boost/format.hpp> #include <iostream> #include <list> @@ -140,7 +139,7 @@ public: asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); //create, open, and connect the socket - _socket = new asio::ip::udp::socket(_io_service); + _socket = socket_sptr(new asio::ip::udp::socket(_io_service)); _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); _sock_fd = _socket->native(); @@ -162,10 +161,6 @@ public: } } - ~udp_zero_copy_asio_impl(void){ - delete _socket; - } - //get size for internal socket buffer template <typename Opt> size_t get_buff_size(void) const{ Opt option; @@ -183,30 +178,24 @@ public: /******************************************************************* * Receive implementation: * - * Use select to perform a blocking receive with timeout. + * Perform a non-blocking receive for performance, + * and then fall back to a blocking receive with timeout. * Return the managed receive buffer with the new length. * When the caller is finished with the managed buffer, * the managed receive buffer is released back into the queue. ******************************************************************/ - UHD_INLINE bool is_recv_ready(double timeout){ - //setup timeval for timeout - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = long(timeout*1e6); - - //setup rset for timeout - fd_set rset; - FD_ZERO(&rset); - FD_SET(_sock_fd, &rset); - - //call select with timeout on receive socket - return ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0; - } - managed_recv_buffer::sptr get_recv_buff(double timeout){ udp_zero_copy_asio_mrb *mrb = NULL; - if (is_recv_ready(timeout) and _pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ - return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0)); + if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ + + #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported + ssize_t ret = ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, MSG_DONTWAIT); + if (ret > 0) return mrb->get_new(ret); + #endif + + if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new( + ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0) + ); } return managed_recv_buffer::sptr(); } @@ -262,7 +251,7 @@ private: //asio guts -> socket and service asio::io_service _io_service; - asio::ip::udp::socket *_socket; + socket_sptr _socket; int _sock_fd; }; |