diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 19 | ||||
-rw-r--r-- | host/lib/transport/muxed_zero_copy_if.cpp | 250 | ||||
-rw-r--r-- | host/lib/transport/nirio/lvbitx/CMakeLists.txt | 5 | ||||
-rw-r--r-- | host/lib/transport/nirio/rpc/rpc_client.cpp | 24 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 64 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 35 | ||||
-rw-r--r-- | host/lib/transport/zero_copy_recv_offload.cpp | 158 |
7 files changed, 473 insertions, 82 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 6abc399b4..44c8d59af 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -22,17 +22,15 @@ ######################################################################## # Include subdirectories (different than add) ######################################################################## -INCLUDE_SUBDIRECTORY(nirio) +IF(ENABLE_X300) + INCLUDE_SUBDIRECTORY(nirio) +ENDIF(ENABLE_X300) ######################################################################## # Setup libusb ######################################################################## -MESSAGE(STATUS "") -FIND_PACKAGE(USB1) - -LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF OFF) - IF(ENABLE_USB) + MESSAGE(STATUS "") MESSAGE(STATUS "USB support enabled via libusb.") INCLUDE_DIRECTORIES(${LIBUSB_INCLUDE_DIRS}) LIBUHD_APPEND_LIBS(${LIBUSB_LIBRARIES}) @@ -124,14 +122,21 @@ LIBUHD_PYTHON_GEN_SOURCE( ) LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp ) +IF(ENABLE_X300) + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp + ) +ENDIF(ENABLE_X300) + # Verbose Debug output for send/recv SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" ) OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" ) diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp new file mode 100644 index 000000000..996db3c98 --- /dev/null +++ b/host/lib/transport/muxed_zero_copy_if.cpp @@ -0,0 +1,250 @@ +// +// Copyright 2016 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/muxed_zero_copy_if.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/safe_call.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread.hpp> +#include <boost/thread/locks.hpp> +#include <map> + +using namespace uhd; +using namespace uhd::transport; + +class muxed_zero_copy_if_impl : public muxed_zero_copy_if, + public boost::enable_shared_from_this<muxed_zero_copy_if_impl> +{ +public: + typedef boost::shared_ptr<muxed_zero_copy_if_impl> sptr; + + muxed_zero_copy_if_impl( + zero_copy_if::sptr base_xport, + stream_classifier_fn classify_fn, + size_t max_streams + ): + _base_xport(base_xport), _classify(classify_fn), + _max_num_streams(max_streams), _num_dropped_frames(0) + { + //Create the receive thread to poll the underlying transport + //and classify packets into queues + _recv_thread = boost::thread( + boost::bind(&muxed_zero_copy_if_impl::_update_queues, this)); + } + + virtual ~muxed_zero_copy_if_impl() + { + UHD_SAFE_CALL( + //Interrupt buffer updater loop + _recv_thread.interrupt(); + //Wait for loop to finish + //No timeout on join. The recv loop is guaranteed + //to terminate in a reasonable amount of time because + //there are no timed blocks on the underlying. + _recv_thread.join(); + //Flush base transport + while (_base_xport->get_recv_buff(0.0001)) /*NOP*/; + //Release child streams + //Note that this will not delete or flush the child streams + //until the owners of the streams have released the respective + //shared pointers. This ensures that packets are not dropped. + _streams.clear(); + ); + } + + virtual zero_copy_if::sptr make_stream(const uint32_t stream_num) + { + boost::lock_guard<boost::mutex> lock(_mutex); + if (_streams.size() >= _max_num_streams) { + throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams."); + } + stream_impl::sptr stream = boost::make_shared<stream_impl>(this->shared_from_this(), stream_num); + _streams[stream_num] = stream; + return stream; + } + + virtual size_t get_num_dropped_frames() const + { + return _num_dropped_frames; + } + + void remove_stream(const uint32_t stream_num) + { + boost::lock_guard<boost::mutex> lock(_mutex); + _streams.erase(stream_num); + } + +private: + class stream_impl : public zero_copy_if + { + public: + typedef boost::shared_ptr<stream_impl> sptr; + typedef boost::weak_ptr<stream_impl> wptr; + + stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num): + _stream_num(stream_num), _muxed_xport(muxed_xport), + _buff_queue(muxed_xport->base_xport()->get_num_recv_frames()) + { + } + + ~stream_impl(void) + { + //First remove the stream from muxed transport + //so no more frames are pushed in + _muxed_xport->remove_stream(_stream_num); + //Flush the transport + managed_recv_buffer::sptr buff; + while (_buff_queue.pop_with_haste(buff)) { + //NOP + } + } + + size_t get_num_recv_frames(void) const { + return _muxed_xport->base_xport()->get_num_recv_frames(); + } + + size_t get_recv_frame_size(void) const { + return _muxed_xport->base_xport()->get_recv_frame_size(); + } + + managed_recv_buffer::sptr get_recv_buff(double timeout) { + managed_recv_buffer::sptr buff; + if (_buff_queue.pop_with_timed_wait(buff, timeout)) { + return buff; + } else { + return managed_recv_buffer::sptr(); + } + + } + + void push_recv_buff(managed_recv_buffer::sptr buff) { + _buff_queue.push_with_wait(buff); + } + + size_t get_num_send_frames(void) const { + return _muxed_xport->base_xport()->get_num_send_frames(); + } + + size_t get_send_frame_size(void) const { + return _muxed_xport->base_xport()->get_send_frame_size(); + } + + managed_send_buffer::sptr get_send_buff(double timeout) + { + return _muxed_xport->base_xport()->get_send_buff(timeout); + } + + private: + const uint32_t _stream_num; + muxed_zero_copy_if_impl::sptr _muxed_xport; + bounded_buffer<managed_recv_buffer::sptr> _buff_queue; + }; + + inline zero_copy_if::sptr& base_xport() { return _base_xport; } + + void _update_queues() + { + //Run forever: + // - Pull packets from the base transport + // - Classify them + // - Push them to the appropriate receive queue + while (true) { + { //Uninterruptable block of code + boost::this_thread::disable_interruption interrupt_disabler; + if (not _process_next_buffer()) { + //Be a good citizen and yield if no packet is processed + static const size_t MIN_DUR = 1; + boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR)); + //We call sleep(MIN_DUR) above instead of yield() to ensure that we + //relinquish the current scheduler time slot. + //yield() is a hint to the scheduler to end the time + //slice early and schedule in another thread that is ready to run. + //However in most situations, there will be no other thread and + //this thread will continue to run which will rail a CPU core. + //We call sleep(MIN_DUR=1) instead which will sleep for a minimum time. + //Ideally we would like to use boost::chrono::.*seconds::min() but that + //is bound to 0, which causes the sleep_for call to be a no-op and + //thus useless to actually force a sleep. + + //**************************************************************** + //NOTE: This behavior makes this transport a poor choice for + // low latency communication. + //**************************************************************** + } + } + //Check if the master thread has requested a shutdown + if (boost::this_thread::interruption_requested()) break; + } + } + + bool _process_next_buffer() + { + managed_recv_buffer::sptr buff = _base_xport->get_recv_buff(0.0); + if (buff) { + stream_impl::sptr stream; + try { + const uint32_t stream_num = _classify(buff->cast<void*>(), _base_xport->get_recv_frame_size()); + { + //Hold the stream mutex long enough to pull a bounded buffer + //and lock it (increment its ref count). + boost::lock_guard<boost::mutex> lock(_mutex); + stream_map_t::iterator str_iter = _streams.find(stream_num); + if (str_iter != _streams.end()) { + stream = (*str_iter).second.lock(); + } + } + } catch (std::exception&) { + //If _classify throws we simply drop the frame + } + //Once a bounded buffer is acquired, we can rely on its + //thread safety to serialize with the consumer. + if (stream.get()) { + stream->push_recv_buff(buff); + } else { + boost::lock_guard<boost::mutex> lock(_mutex); + _num_dropped_frames++; + } + //We processed a packet, and there could be more coming + //Don't yield in the next iteration. + return true; + } else { + //The base transport is idle. Return false to let the + //thread yield. + return false; + } + } + + typedef std::map<uint32_t, stream_impl::wptr> stream_map_t; + + zero_copy_if::sptr _base_xport; + stream_classifier_fn _classify; + stream_map_t _streams; + const size_t _max_num_streams; + size_t _num_dropped_frames; + boost::thread _recv_thread; + boost::mutex _mutex; +}; + +muxed_zero_copy_if::sptr muxed_zero_copy_if::make( + zero_copy_if::sptr base_xport, + muxed_zero_copy_if::stream_classifier_fn classify_fn, + size_t max_streams +) { + return boost::make_shared<muxed_zero_copy_if_impl>(base_xport, classify_fn, max_streams); +} diff --git a/host/lib/transport/nirio/lvbitx/CMakeLists.txt b/host/lib/transport/nirio/lvbitx/CMakeLists.txt index b9a2a9f15..5741a12f8 100644 --- a/host/lib/transport/nirio/lvbitx/CMakeLists.txt +++ b/host/lib/transport/nirio/lvbitx/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2013 Ettus Research LLC +# Copyright 2013,2015 Ettus Research LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -30,8 +30,8 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile) SET(IMAGES_PATH_OPT --uhd-images-path=${UHD_IMAGES_DIR}) ADD_CUSTOM_COMMAND( - OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp + OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/process-lvbitx.py DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.hpp DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.cpp @@ -41,6 +41,7 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile) ) #make libuhd depend on the output file + LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp) LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp) ENDMACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM) diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp index bbaf9f235..3d62b57ae 100644 --- a/host/lib/transport/nirio/rpc/rpc_client.cpp +++ b/host/lib/transport/nirio/rpc/rpc_client.cpp @@ -1,5 +1,5 @@ /// -// Copyright 2013 Ettus Research LLC +// Copyright 2013,2016 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -55,22 +55,7 @@ rpc_client::rpc_client ( tcp::resolver::query::flags query_flags(tcp::resolver::query::passive); tcp::resolver::query query(tcp::v4(), server, port, query_flags); tcp::resolver::iterator iterator = resolver.resolve(query); - - #if BOOST_VERSION < 104700 - // default constructor creates end iterator - tcp::resolver::iterator end; - - boost::system::error_code error = boost::asio::error::host_not_found; - while (error && iterator != end) - { - _socket.close(); - _socket.connect(*iterator++, error); - } - if (error) - throw boost::system::system_error(error); - #else - boost::asio::connect(_socket, iterator); - #endif + boost::asio::connect(_socket, iterator); UHD_LOG << "rpc_client connected to server." << std::endl; @@ -109,11 +94,6 @@ rpc_client::rpc_client ( } catch (boost::exception&) { UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl; _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); -#if BOOST_VERSION < 104700 - } catch (std::exception& e) { - UHD_LOG << "rpc_client connection error: " << e.what() << std::endl; - _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); -#endif } } diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 8bfa1973a..541d9f3bc 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -24,7 +24,6 @@ #include <uhd/stream.hpp> #include <uhd/utils/msg.hpp> #include <uhd/utils/tasks.hpp> -#include <uhd/utils/atomic.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> @@ -35,7 +34,6 @@ #include <boost/format.hpp> #include <boost/bind.hpp> #include <boost/make_shared.hpp> -#include <boost/thread/barrier.hpp> #include <iostream> #include <vector> @@ -92,22 +90,15 @@ public: } ~recv_packet_handler(void){ - _task_barrier.interrupt(); - _task_handlers.clear(); + /* NOP */ } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; - _task_handlers.clear(); _props.resize(size); //re-initialize all buffers infos by re-creating the vector _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size)); - _task_barrier.resize(size); - _task_handlers.resize(size); - for (size_t i = 1/*skip 0*/; i < size; i++){ - _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i)); - }; } //! Get the channel width of this handler @@ -127,7 +118,7 @@ public: * \param threshold number of packets per channel */ void set_alignment_failure_threshold(const size_t threshold){ - _alignment_faulure_threshold = threshold*this->size(); + _alignment_failure_threshold = threshold*this->size(); } //! Set the rate of ticks per second @@ -203,6 +194,12 @@ public: //! Overload call to issue stream commands void issue_stream_cmd(const stream_cmd_t &stream_cmd) { + if (stream_cmd.stream_now + and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS + and _props.size() > 1) { + throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); + } + for (size_t i = 0; i < _props.size(); i++) { if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd); @@ -234,7 +231,7 @@ public: buffs, nsamps_per_buff, metadata, timeout ); - if (one_packet){ + if (one_packet or metadata.end_of_burst){ #ifdef UHD_TXRX_DEBUG_PRINTS dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); #endif @@ -242,7 +239,9 @@ public: } //first recv had an error code set, return immediately - if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + return accum_num_samps; + } //loop until buffer is filled or error code while(accum_num_samps < nsamps_per_buff){ @@ -256,10 +255,16 @@ public: _queue_error_for_next_call = true; break; } + accum_num_samps += num_samps; + + //return immediately if end of burst + if (_queue_metadata.end_of_burst) { + break; + } } #ifdef UHD_TXRX_DEBUG_PRINTS - dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); + dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); #endif return accum_num_samps; } @@ -269,7 +274,7 @@ private: size_t _header_offset_words32; double _tick_rate, _samp_rate; bool _queue_error_for_next_call; - size_t _alignment_faulure_threshold; + size_t _alignment_failure_threshold; rx_metadata_t _queue_metadata; struct xport_chan_props_type{ xport_chan_props_type(void): @@ -576,6 +581,9 @@ private: case PACKET_TIMEOUT_ERROR: std::swap(curr_info, next_info); //save progress from curr -> next + if(_props[index].handle_flowctrl) { + _props[index].handle_flowctrl(next_info[index].ifpi.packet_count); + } curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; return; @@ -593,7 +601,7 @@ private: } //too many iterations: detect alignment failure - if (iterations++ > _alignment_faulure_threshold){ + if (iterations++ > _alignment_failure_threshold){ UHD_MSG(error) << boost::format( "The receive packet handler failed to time-align packets.\n" "%u received packets were processed by the handler.\n" @@ -657,7 +665,9 @@ private: _convert_bytes_to_copy = bytes_to_copy; //perform N channels of conversion - converter_thread_task(0); + for (size_t i = 0; i < this->size(); i++) { + convert_to_out_buff(i); + } //update the copy buffer's availability info.data_bytes_to_copy -= bytes_to_copy; @@ -670,15 +680,15 @@ private: return nsamps_to_copy_per_io_buff; } - /******************************************************************* - * Perform one thread's work of the conversion task. - * The entry and exit use a dual synchronization barrier, - * to wait for data to become ready and block until completion. - ******************************************************************/ - UHD_INLINE void converter_thread_task(const size_t index) + /*! Run the conversion from the internal buffers to the user's output + * buffer. + * + * - Calls the converter + * - Releases internal data buffers + * - Updates read/write pointers + */ + inline void convert_to_out_buff(const size_t index) { - _task_barrier.wait(); - //shortcut references to local data structures buffers_info_type &buff_info = get_curr_buffer_info(); per_buffer_info_type &info = buff_info[index]; @@ -702,13 +712,9 @@ private: if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){ info.buff.reset(); //effectively a release } - - if (index == 0) _task_barrier.wait_others(); } //! Shared variables for the worker threads - reusable_barrier _task_barrier; - std::vector<task::sptr> _task_handlers; size_t _convert_nsamps; const rx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index c2810842e..7f43168a0 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -24,11 +24,11 @@ #include <uhd/stream.hpp> #include <uhd/utils/msg.hpp> #include <uhd/utils/tasks.hpp> -#include <uhd/utils/atomic.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> +#include <boost/thread/thread.hpp> #include <boost/thread/thread_time.hpp> #include <boost/foreach.hpp> #include <boost/function.hpp> @@ -74,22 +74,15 @@ public: } ~send_packet_handler(void){ - _task_barrier.interrupt(); - _task_handlers.clear(); + /* NOP */ } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; - _task_handlers.clear(); _props.resize(size); static const boost::uint64_t zero = 0; _zero_buffs.resize(size, &zero); - _task_barrier.resize(size); - _task_handlers.resize(size); - for (size_t i = 1/*skip 0*/; i < size; i++){ - _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i)); - }; } //! Get the channel width of this handler @@ -377,21 +370,23 @@ private: _convert_if_packet_info = &if_packet_info; //perform N channels of conversion - converter_thread_task(0); + for (size_t i = 0; i < this->size(); i++) { + convert_to_in_buff(i); + } _next_packet_seq++; //increment sequence after commits return nsamps_per_buff; } - /******************************************************************* - * Perform one thread's work of the conversion task. - * The entry and exit use a dual synchronization barrier, - * to wait for data to become ready and block until completion. - ******************************************************************/ - UHD_INLINE void converter_thread_task(const size_t index) + /*! Run the conversion from the internal buffers to the user's input + * buffer. + * + * - Calls the converter + * - Releases internal data buffers + * - Updates read/write pointers + */ + UHD_INLINE void convert_to_in_buff(const size_t index) { - _task_barrier.wait(); - //shortcut references to local data structures managed_send_buffer::sptr &buff = _props[index].buff; vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; @@ -419,13 +414,9 @@ private: const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; buff->commit(num_vita_words32*sizeof(boost::uint32_t)); buff.reset(); //effectively a release - - if (index == 0) _task_barrier.wait_others(); } //! Shared variables for the worker threads - reusable_barrier _task_barrier; - std::vector<task::sptr> _task_handlers; size_t _convert_nsamps; const tx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; diff --git a/host/lib/transport/zero_copy_recv_offload.cpp b/host/lib/transport/zero_copy_recv_offload.cpp new file mode 100644 index 000000000..e8b013abc --- /dev/null +++ b/host/lib/transport/zero_copy_recv_offload.cpp @@ -0,0 +1,158 @@ +// +// Copyright 2016 Ettus Research +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/zero_copy_recv_offload.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/safe_call.hpp> +#include <boost/format.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> +#include <boost/bind.hpp> + +using namespace uhd; +using namespace uhd::transport; + +typedef bounded_buffer<managed_recv_buffer::sptr> bounded_buffer_t; + +/*********************************************************************** + * Zero copy offload transport: + * An intermediate transport that utilizes threading to free + * the main thread from any receive work. + **********************************************************************/ +class zero_copy_recv_offload_impl : public zero_copy_recv_offload { +public: + typedef boost::shared_ptr<zero_copy_recv_offload_impl> sptr; + + zero_copy_recv_offload_impl(zero_copy_if::sptr transport, + const double timeout) : + _transport(transport), _timeout(timeout), + _inbox(transport->get_num_recv_frames()), + _recv_done(false) + { + UHD_LOG << "Created threaded transport" << std::endl; + + // Create the receive and send threads to offload + // the system calls onto other threads + _recv_thread = boost::thread( + boost::bind(&zero_copy_recv_offload_impl::enqueue_recv, this) + ); + } + + // Receive thread flags + void set_recv_done() + { + boost::lock_guard<boost::mutex> guard(_recv_mutex); + _recv_done = true; + } + + bool is_recv_done() + { + boost::lock_guard<boost::mutex> guard(_recv_mutex); + return _recv_done; + } + + ~zero_copy_recv_offload_impl() + { + // Signal the threads we're finished + set_recv_done(); + + // Wait for them to join + UHD_SAFE_CALL( + _recv_thread.join(); + ) + } + + // The receive thread function is responsible for + // pulling pointers to managed receiver buffers quickly + void enqueue_recv() + { + while (not is_recv_done()) { + managed_recv_buffer::sptr buff = _transport->get_recv_buff(_timeout); + if (not buff) continue; + _inbox.push_with_timed_wait(buff, _timeout); + } + } + + /******************************************************************* + * Receive implementation: + * Pop the receive buffer pointer from the underlying transport + ******************************************************************/ + managed_recv_buffer::sptr get_recv_buff(double timeout) + { + managed_recv_buffer::sptr ptr; + _inbox.pop_with_timed_wait(ptr, timeout); + return ptr; + } + + size_t get_num_recv_frames() const + { + return _transport->get_num_recv_frames(); + } + + size_t get_recv_frame_size() const + { + return _transport->get_recv_frame_size(); + } + + /******************************************************************* + * Send implementation: + * Pass the send buffer pointer from the underlying transport + ******************************************************************/ + managed_send_buffer::sptr get_send_buff(double timeout) + { + return _transport->get_send_buff(timeout); + } + + size_t get_num_send_frames() const + { + return _transport->get_num_send_frames(); + } + + size_t get_send_frame_size() const + { + return _transport->get_send_frame_size(); + } + +private: + // The linked transport + zero_copy_if::sptr _transport; + + const double _timeout; + + // Shared buffers + bounded_buffer_t _inbox; + + // Threading + bool _recv_done; + boost::thread _recv_thread; + boost::mutex _recv_mutex; +}; + +zero_copy_recv_offload::sptr zero_copy_recv_offload::make( + zero_copy_if::sptr transport, + const double timeout) +{ + zero_copy_recv_offload_impl::sptr zero_copy_recv_offload( + new zero_copy_recv_offload_impl(transport, timeout) + ); + + return zero_copy_recv_offload; +} |