diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 18 | ||||
-rw-r--r-- | host/lib/transport/nirio/lvbitx/CMakeLists.txt | 5 | ||||
-rw-r--r-- | host/lib/transport/nirio/rpc/rpc_client.cpp | 24 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 47 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 35 | ||||
-rw-r--r-- | host/lib/transport/zero_copy_recv_offload.cpp | 158 |
6 files changed, 208 insertions, 79 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 6abc399b4..db21b9f8e 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -22,17 +22,15 @@ ######################################################################## # Include subdirectories (different than add) ######################################################################## -INCLUDE_SUBDIRECTORY(nirio) +IF(ENABLE_X300) + INCLUDE_SUBDIRECTORY(nirio) +ENDIF(ENABLE_X300) ######################################################################## # Setup libusb ######################################################################## -MESSAGE(STATUS "") -FIND_PACKAGE(USB1) - -LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF OFF) - IF(ENABLE_USB) + MESSAGE(STATUS "") MESSAGE(STATUS "USB support enabled via libusb.") INCLUDE_DIRECTORIES(${LIBUSB_INCLUDE_DIRS}) LIBUHD_APPEND_LIBS(${LIBUSB_LIBRARIES}) @@ -124,14 +122,20 @@ LIBUHD_PYTHON_GEN_SOURCE( ) LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp ) +IF(ENABLE_X300) + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp + ) +ENDIF(ENABLE_X300) + # Verbose Debug output for send/recv SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" ) OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" ) diff --git a/host/lib/transport/nirio/lvbitx/CMakeLists.txt b/host/lib/transport/nirio/lvbitx/CMakeLists.txt index b9a2a9f15..5741a12f8 100644 --- a/host/lib/transport/nirio/lvbitx/CMakeLists.txt +++ b/host/lib/transport/nirio/lvbitx/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2013 Ettus Research LLC +# Copyright 2013,2015 Ettus Research LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -30,8 +30,8 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile) SET(IMAGES_PATH_OPT --uhd-images-path=${UHD_IMAGES_DIR}) ADD_CUSTOM_COMMAND( - OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp + OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/process-lvbitx.py DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.hpp DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.cpp @@ -41,6 +41,7 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile) ) #make libuhd depend on the output file + LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp) LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp) ENDMACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM) diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp index bbaf9f235..3d62b57ae 100644 --- a/host/lib/transport/nirio/rpc/rpc_client.cpp +++ b/host/lib/transport/nirio/rpc/rpc_client.cpp @@ -1,5 +1,5 @@ /// -// Copyright 2013 Ettus Research LLC +// Copyright 2013,2016 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -55,22 +55,7 @@ rpc_client::rpc_client ( tcp::resolver::query::flags query_flags(tcp::resolver::query::passive); tcp::resolver::query query(tcp::v4(), server, port, query_flags); tcp::resolver::iterator iterator = resolver.resolve(query); - - #if BOOST_VERSION < 104700 - // default constructor creates end iterator - tcp::resolver::iterator end; - - boost::system::error_code error = boost::asio::error::host_not_found; - while (error && iterator != end) - { - _socket.close(); - _socket.connect(*iterator++, error); - } - if (error) - throw boost::system::system_error(error); - #else - boost::asio::connect(_socket, iterator); - #endif + boost::asio::connect(_socket, iterator); UHD_LOG << "rpc_client connected to server." << std::endl; @@ -109,11 +94,6 @@ rpc_client::rpc_client ( } catch (boost::exception&) { UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl; _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); -#if BOOST_VERSION < 104700 - } catch (std::exception& e) { - UHD_LOG << "rpc_client connection error: " << e.what() << std::endl; - _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); -#endif } } diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 8bfa1973a..5ca18d4a0 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -24,7 +24,6 @@ #include <uhd/stream.hpp> #include <uhd/utils/msg.hpp> #include <uhd/utils/tasks.hpp> -#include <uhd/utils/atomic.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> @@ -35,7 +34,6 @@ #include <boost/format.hpp> #include <boost/bind.hpp> #include <boost/make_shared.hpp> -#include <boost/thread/barrier.hpp> #include <iostream> #include <vector> @@ -92,22 +90,15 @@ public: } ~recv_packet_handler(void){ - _task_barrier.interrupt(); - _task_handlers.clear(); + /* NOP */ } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; - _task_handlers.clear(); _props.resize(size); //re-initialize all buffers infos by re-creating the vector _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size)); - _task_barrier.resize(size); - _task_handlers.resize(size); - for (size_t i = 1/*skip 0*/; i < size; i++){ - _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i)); - }; } //! Get the channel width of this handler @@ -127,7 +118,7 @@ public: * \param threshold number of packets per channel */ void set_alignment_failure_threshold(const size_t threshold){ - _alignment_faulure_threshold = threshold*this->size(); + _alignment_failure_threshold = threshold*this->size(); } //! Set the rate of ticks per second @@ -203,6 +194,12 @@ public: //! Overload call to issue stream commands void issue_stream_cmd(const stream_cmd_t &stream_cmd) { + if (stream_cmd.stream_now + and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS + and _props.size() > 1) { + throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); + } + for (size_t i = 0; i < _props.size(); i++) { if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd); @@ -269,7 +266,7 @@ private: size_t _header_offset_words32; double _tick_rate, _samp_rate; bool _queue_error_for_next_call; - size_t _alignment_faulure_threshold; + size_t _alignment_failure_threshold; rx_metadata_t _queue_metadata; struct xport_chan_props_type{ xport_chan_props_type(void): @@ -593,7 +590,7 @@ private: } //too many iterations: detect alignment failure - if (iterations++ > _alignment_faulure_threshold){ + if (iterations++ > _alignment_failure_threshold){ UHD_MSG(error) << boost::format( "The receive packet handler failed to time-align packets.\n" "%u received packets were processed by the handler.\n" @@ -657,7 +654,9 @@ private: _convert_bytes_to_copy = bytes_to_copy; //perform N channels of conversion - converter_thread_task(0); + for (size_t i = 0; i < buffs.size(); i++) { + convert_to_out_buff(i); + } //update the copy buffer's availability info.data_bytes_to_copy -= bytes_to_copy; @@ -670,15 +669,15 @@ private: return nsamps_to_copy_per_io_buff; } - /******************************************************************* - * Perform one thread's work of the conversion task. - * The entry and exit use a dual synchronization barrier, - * to wait for data to become ready and block until completion. - ******************************************************************/ - UHD_INLINE void converter_thread_task(const size_t index) + /*! Run the conversion from the internal buffers to the user's output + * buffer. + * + * - Calls the converter + * - Releases internal data buffers + * - Updates read/write pointers + */ + inline void convert_to_out_buff(const size_t index) { - _task_barrier.wait(); - //shortcut references to local data structures buffers_info_type &buff_info = get_curr_buffer_info(); per_buffer_info_type &info = buff_info[index]; @@ -702,13 +701,9 @@ private: if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){ info.buff.reset(); //effectively a release } - - if (index == 0) _task_barrier.wait_others(); } //! Shared variables for the worker threads - reusable_barrier _task_barrier; - std::vector<task::sptr> _task_handlers; size_t _convert_nsamps; const rx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index c2810842e..a6b9b12d0 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -24,11 +24,11 @@ #include <uhd/stream.hpp> #include <uhd/utils/msg.hpp> #include <uhd/utils/tasks.hpp> -#include <uhd/utils/atomic.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> +#include <boost/thread/thread.hpp> #include <boost/thread/thread_time.hpp> #include <boost/foreach.hpp> #include <boost/function.hpp> @@ -74,22 +74,15 @@ public: } ~send_packet_handler(void){ - _task_barrier.interrupt(); - _task_handlers.clear(); + /* NOP */ } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; - _task_handlers.clear(); _props.resize(size); static const boost::uint64_t zero = 0; _zero_buffs.resize(size, &zero); - _task_barrier.resize(size); - _task_handlers.resize(size); - for (size_t i = 1/*skip 0*/; i < size; i++){ - _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i)); - }; } //! Get the channel width of this handler @@ -377,21 +370,23 @@ private: _convert_if_packet_info = &if_packet_info; //perform N channels of conversion - converter_thread_task(0); + for (size_t i = 0; i < buffs.size(); i++) { + convert_to_in_buff(i); + } _next_packet_seq++; //increment sequence after commits return nsamps_per_buff; } - /******************************************************************* - * Perform one thread's work of the conversion task. - * The entry and exit use a dual synchronization barrier, - * to wait for data to become ready and block until completion. - ******************************************************************/ - UHD_INLINE void converter_thread_task(const size_t index) + /*! Run the conversion from the internal buffers to the user's input + * buffer. + * + * - Calls the converter + * - Releases internal data buffers + * - Updates read/write pointers + */ + UHD_INLINE void convert_to_in_buff(const size_t index) { - _task_barrier.wait(); - //shortcut references to local data structures managed_send_buffer::sptr &buff = _props[index].buff; vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; @@ -419,13 +414,9 @@ private: const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; buff->commit(num_vita_words32*sizeof(boost::uint32_t)); buff.reset(); //effectively a release - - if (index == 0) _task_barrier.wait_others(); } //! Shared variables for the worker threads - reusable_barrier _task_barrier; - std::vector<task::sptr> _task_handlers; size_t _convert_nsamps; const tx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; diff --git a/host/lib/transport/zero_copy_recv_offload.cpp b/host/lib/transport/zero_copy_recv_offload.cpp new file mode 100644 index 000000000..e8b013abc --- /dev/null +++ b/host/lib/transport/zero_copy_recv_offload.cpp @@ -0,0 +1,158 @@ +// +// Copyright 2016 Ettus Research +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/zero_copy_recv_offload.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/safe_call.hpp> +#include <boost/format.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> +#include <boost/bind.hpp> + +using namespace uhd; +using namespace uhd::transport; + +typedef bounded_buffer<managed_recv_buffer::sptr> bounded_buffer_t; + +/*********************************************************************** + * Zero copy offload transport: + * An intermediate transport that utilizes threading to free + * the main thread from any receive work. + **********************************************************************/ +class zero_copy_recv_offload_impl : public zero_copy_recv_offload { +public: + typedef boost::shared_ptr<zero_copy_recv_offload_impl> sptr; + + zero_copy_recv_offload_impl(zero_copy_if::sptr transport, + const double timeout) : + _transport(transport), _timeout(timeout), + _inbox(transport->get_num_recv_frames()), + _recv_done(false) + { + UHD_LOG << "Created threaded transport" << std::endl; + + // Create the receive and send threads to offload + // the system calls onto other threads + _recv_thread = boost::thread( + boost::bind(&zero_copy_recv_offload_impl::enqueue_recv, this) + ); + } + + // Receive thread flags + void set_recv_done() + { + boost::lock_guard<boost::mutex> guard(_recv_mutex); + _recv_done = true; + } + + bool is_recv_done() + { + boost::lock_guard<boost::mutex> guard(_recv_mutex); + return _recv_done; + } + + ~zero_copy_recv_offload_impl() + { + // Signal the threads we're finished + set_recv_done(); + + // Wait for them to join + UHD_SAFE_CALL( + _recv_thread.join(); + ) + } + + // The receive thread function is responsible for + // pulling pointers to managed receiver buffers quickly + void enqueue_recv() + { + while (not is_recv_done()) { + managed_recv_buffer::sptr buff = _transport->get_recv_buff(_timeout); + if (not buff) continue; + _inbox.push_with_timed_wait(buff, _timeout); + } + } + + /******************************************************************* + * Receive implementation: + * Pop the receive buffer pointer from the underlying transport + ******************************************************************/ + managed_recv_buffer::sptr get_recv_buff(double timeout) + { + managed_recv_buffer::sptr ptr; + _inbox.pop_with_timed_wait(ptr, timeout); + return ptr; + } + + size_t get_num_recv_frames() const + { + return _transport->get_num_recv_frames(); + } + + size_t get_recv_frame_size() const + { + return _transport->get_recv_frame_size(); + } + + /******************************************************************* + * Send implementation: + * Pass the send buffer pointer from the underlying transport + ******************************************************************/ + managed_send_buffer::sptr get_send_buff(double timeout) + { + return _transport->get_send_buff(timeout); + } + + size_t get_num_send_frames() const + { + return _transport->get_num_send_frames(); + } + + size_t get_send_frame_size() const + { + return _transport->get_send_frame_size(); + } + +private: + // The linked transport + zero_copy_if::sptr _transport; + + const double _timeout; + + // Shared buffers + bounded_buffer_t _inbox; + + // Threading + bool _recv_done; + boost::thread _recv_thread; + boost::mutex _recv_mutex; +}; + +zero_copy_recv_offload::sptr zero_copy_recv_offload::make( + zero_copy_if::sptr transport, + const double timeout) +{ + zero_copy_recv_offload_impl::sptr zero_copy_recv_offload( + new zero_copy_recv_offload_impl(transport, timeout) + ); + + return zero_copy_recv_offload; +} |