From e0d84e8c8b4f0bb9fec22b48b7c61b9ca3eb8dbd Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Thu, 11 Nov 2010 12:19:03 -0800 Subject: uhd: pulled in some worthwhile changes from flow control branch --- host/lib/transport/udp_simple.cpp | 44 +++++++++++++++---------------- host/lib/transport/udp_zero_copy_asio.cpp | 31 ++++++++++++++-------- host/lib/transport/vrt_packet_handler.hpp | 16 ++++++++--- 3 files changed, 55 insertions(+), 36 deletions(-) (limited to 'host/lib/transport') diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp index 89750f99d..5829b462b 100644 --- a/host/lib/transport/udp_simple.cpp +++ b/host/lib/transport/udp_simple.cpp @@ -27,23 +27,25 @@ using namespace uhd::transport; * Helper Functions **********************************************************************/ /*! - * A receive timeout for a socket: - * - * It seems that asio cannot have timeouts with synchronous io. - * However, we can implement a polling loop that will timeout. - * This is okay bacause this is the slow-path implementation. - * + * Wait for available data or timeout. * \param socket the asio socket - * \param timeout_ms the timeout in milliseconds + * \param timeout the timeout in seconds + * \return false for timeout, true for data */ -static void reasonable_recv_timeout( - boost::asio::ip::udp::socket &socket, size_t timeout_ms +static bool wait_available( + boost::asio::ip::udp::socket &socket, double timeout ){ - boost::asio::deadline_timer timer(socket.get_io_service()); - timer.expires_from_now(boost::posix_time::milliseconds(timeout_ms)); - while (not (socket.available() or timer.expires_from_now().is_negative())){ - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - } + //setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = long(timeout*1e6); + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(socket.native(), &rset); + + return ::select(socket.native()+1, &rset, NULL, NULL, &tv) > 0; } /*********************************************************************** @@ -57,7 +59,7 @@ public: //send/recv size_t send(const boost::asio::const_buffer &); - size_t recv(const boost::asio::mutable_buffer &, size_t); + size_t recv(const boost::asio::mutable_buffer &, double); private: boost::asio::ip::udp::socket *_socket; @@ -86,9 +88,8 @@ size_t udp_connected_impl::send(const boost::asio::const_buffer &buff){ return _socket->send(boost::asio::buffer(buff)); } -size_t udp_connected_impl::recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){ - reasonable_recv_timeout(*_socket, timeout_ms); - if (not _socket->available()) return 0; +size_t udp_connected_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){ + if (not wait_available(*_socket, timeout)) return 0; return _socket->receive(boost::asio::buffer(buff)); } @@ -103,7 +104,7 @@ public: //send/recv size_t send(const boost::asio::const_buffer &); - size_t recv(const boost::asio::mutable_buffer &, size_t); + size_t recv(const boost::asio::mutable_buffer &, double); private: boost::asio::ip::udp::socket *_socket; @@ -137,9 +138,8 @@ size_t udp_broadcast_impl::send(const boost::asio::const_buffer &buff){ return _socket->send_to(boost::asio::buffer(buff), _receiver_endpoint); } -size_t udp_broadcast_impl::recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){ - reasonable_recv_timeout(*_socket, timeout_ms); - if (not _socket->available()) return 0; +size_t udp_broadcast_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){ + if (not wait_available(*_socket, timeout)) return 0; boost::asio::ip::udp::endpoint sender_endpoint; return _socket->receive_from(boost::asio::buffer(buff), sender_endpoint); } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index ed29864e9..c758fa894 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -59,16 +59,23 @@ static const size_t DEFAULT_NUM_RECV_FRAMES = 32; #else static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu; #endif + //The non-async send only ever requires a single frame //because the buffer will be committed before a new get. #ifdef USE_ASIO_ASYNC_SEND static const size_t DEFAULT_NUM_SEND_FRAMES = 32; #else -static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu;; +static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu; #endif -//a single concurrent thread for io_service seems to be the fastest +//The number of service threads to spawn for async ASIO: +//A single concurrent thread for io_service seems to be the fastest. +//Threads are disabled when no async implementations are enabled. +#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND) static const size_t CONCURRENCY_HINT = 1; +#else +static const size_t CONCURRENCY_HINT = 0; +#endif /*********************************************************************** * Zero Copy UDP implementation with ASIO: @@ -86,11 +93,12 @@ public: const std::string &port, const device_addr_t &hints ): - _io_service(hints.cast("concurrency_hint", CONCURRENCY_HINT)), _recv_frame_size(size_t(hints.cast("recv_frame_size", udp_simple::mtu))), _num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_RECV_FRAMES))), _send_frame_size(size_t(hints.cast("send_frame_size", udp_simple::mtu))), - _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_SEND_FRAMES))) + _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_SEND_FRAMES))), + _concurrency_hint(hints.cast("concurrency_hint", CONCURRENCY_HINT)), + _io_service(_concurrency_hint) { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -129,7 +137,7 @@ public: //spawn the service threads that will run the io service _work = new asio::io_service::work(_io_service); //new work to delete later - for (size_t i = 0; i < CONCURRENCY_HINT; i++) _thread_group.create_thread( + for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread( boost::bind(&udp_zero_copy_asio_impl::service, this) ); } @@ -292,12 +300,6 @@ public: size_t get_send_frame_size(void) const {return _send_frame_size;} private: - //asio guts -> socket and service - asio::ip::udp::socket *_socket; - asio::io_service _io_service; - asio::io_service::work *_work; - int _sock_fd; - //memory management -> buffers and fifos boost::thread_group _thread_group; boost::shared_array _send_buffer, _recv_buffer; @@ -305,6 +307,13 @@ private: pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; + + //asio guts -> socket and service + size_t _concurrency_hint; + asio::io_service _io_service; + asio::ip::udp::socket *_socket; + asio::io_service::work *_work; + int _sock_fd; }; /*********************************************************************** diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index 939517411..278bcfeaa 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -318,7 +318,7 @@ template UHD_INLINE T get_context_code( ){ //load the rest of the if_packet_info in here if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*otw_type.get_sample_size())/sizeof(boost::uint32_t); - if_packet_info.packet_count = state.next_packet_seq++; + if_packet_info.packet_count = state.next_packet_seq; //get send buffers for each channel managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); @@ -345,6 +345,7 @@ template UHD_INLINE T get_context_code( size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); send_buffs[i]->commit(num_bytes_total); } + state.next_packet_seq++; //increment sequence after commits return num_samps; } @@ -387,10 +388,19 @@ template UHD_INLINE T get_context_code( if_packet_info.sob = metadata.start_of_burst; if_packet_info.eob = metadata.end_of_burst; + //TODO remove this code when sample counts of zero are supported by hardware + std::vector buffs_(buffs); + size_t total_num_samps_(total_num_samps); + if (total_num_samps == 0){ + static const boost::uint64_t zeros = 0; //max size of a host sample + buffs_ = std::vector(buffs.size(), &zeros); + total_num_samps_ = 1; + } + return _send1( state, - buffs, 0, - std::min(total_num_samps, max_samples_per_packet), + buffs_, 0, + std::min(total_num_samps_, max_samples_per_packet), if_packet_info, io_type, otw_type, vrt_packer, -- cgit v1.2.3 From d7317dac9360f26444da4043ca066a46749917b3 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Wed, 17 Nov 2010 09:21:04 -0800 Subject: udp: added polling alternative to select for mac --- host/lib/transport/udp_simple.cpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'host/lib/transport') diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp index 5829b462b..6799ac7b2 100644 --- a/host/lib/transport/udp_simple.cpp +++ b/host/lib/transport/udp_simple.cpp @@ -35,6 +35,8 @@ using namespace uhd::transport; static bool wait_available( boost::asio::ip::udp::socket &socket, double timeout ){ + #if defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) + //setup timeval for timeout timeval tv; tv.tv_sec = 0; @@ -46,6 +48,17 @@ static bool wait_available( FD_SET(socket.native(), &rset); return ::select(socket.native()+1, &rset, NULL, NULL, &tv) > 0; + + #else /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ + + //FIXME: why does select fail on macintosh? + for (size_t i = 0; i < size_t(timeout*1e3); i++){ + if (socket.available()) return true; + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + } + return false; + + #endif /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ } /*********************************************************************** -- cgit v1.2.3