diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 19 | ||||
-rw-r--r-- | host/lib/transport/libusb1_base.cpp | 62 | ||||
-rw-r--r-- | host/lib/transport/libusb1_base.hpp | 12 | ||||
-rw-r--r-- | host/lib/transport/libusb1_control.cpp | 24 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 47 | ||||
-rw-r--r-- | host/lib/transport/muxed_zero_copy_if.cpp | 250 | ||||
-rw-r--r-- | host/lib/transport/nirio/lvbitx/CMakeLists.txt | 5 | ||||
-rw-r--r-- | host/lib/transport/nirio/rpc/rpc_client.cpp | 24 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 127 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 65 | ||||
-rw-r--r-- | host/lib/transport/usb_dummy_impl.cpp | 12 | ||||
-rw-r--r-- | host/lib/transport/zero_copy_recv_offload.cpp | 158 |
12 files changed, 667 insertions, 138 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 6abc399b4..44c8d59af 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -22,17 +22,15 @@ ######################################################################## # Include subdirectories (different than add) ######################################################################## -INCLUDE_SUBDIRECTORY(nirio) +IF(ENABLE_X300) + INCLUDE_SUBDIRECTORY(nirio) +ENDIF(ENABLE_X300) ######################################################################## # Setup libusb ######################################################################## -MESSAGE(STATUS "") -FIND_PACKAGE(USB1) - -LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF OFF) - IF(ENABLE_USB) + MESSAGE(STATUS "") MESSAGE(STATUS "USB support enabled via libusb.") INCLUDE_DIRECTORIES(${LIBUSB_INCLUDE_DIRS}) LIBUHD_APPEND_LIBS(${LIBUSB_LIBRARIES}) @@ -124,14 +122,21 @@ LIBUHD_PYTHON_GEN_SOURCE( ) LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp ) +IF(ENABLE_X300) + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp + ) +ENDIF(ENABLE_X300) + # Verbose Debug output for send/recv SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" ) OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" ) diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index f92117a9e..7b9e11da9 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -47,10 +47,7 @@ public: task_handler = task::make(boost::bind(&libusb_session_impl::libusb_event_handler_task, this, _context)); } - ~libusb_session_impl(void){ - task_handler.reset(); - libusb_exit(_context); - } + virtual ~libusb_session_impl(void); libusb_context *get_context(void) const{ return _context; @@ -86,6 +83,11 @@ private: } }; +libusb_session_impl::~libusb_session_impl(void){ + task_handler.reset(); + libusb_exit(_context); +} + libusb::session::sptr libusb::session::get_global_session(void){ static boost::weak_ptr<session> global_session; @@ -121,9 +123,7 @@ public: _dev = dev; } - ~libusb_device_impl(void){ - libusb_unref_device(this->get()); - } + virtual ~libusb_device_impl(void); libusb_device *get(void) const{ return _dev; @@ -134,6 +134,10 @@ private: libusb_device *_dev; }; +libusb_device_impl::~libusb_device_impl(void){ + libusb_unref_device(this->get()); +} + /*********************************************************************** * libusb device list **********************************************************************/ @@ -160,6 +164,8 @@ public: libusb_free_device_list(dev_list, false/*dont unref*/); } + virtual ~libusb_device_list_impl(void); + size_t size(void) const{ return _devs.size(); } @@ -172,6 +178,10 @@ private: std::vector<libusb::device::sptr> _devs; }; +libusb_device_list_impl::~libusb_device_list_impl(void){ + /* NOP */ +} + libusb::device_list::sptr libusb::device_list::make(void){ return sptr(new libusb_device_list_impl()); } @@ -190,6 +200,8 @@ public: UHD_ASSERT_THROW(libusb_get_device_descriptor(_dev->get(), &_desc) == 0); } + virtual ~libusb_device_descriptor_impl(void); + const libusb_device_descriptor &get(void) const{ return _desc; } @@ -207,12 +219,12 @@ public: ); unsigned char buff[512]; - ssize_t ret = libusb_get_string_descriptor_ascii( - handle->get(), off, buff, sizeof(buff) + int ret = libusb_get_string_descriptor_ascii( + handle->get(), off, buff, int(sizeof(buff)) ); if (ret < 0) return ""; //on error, just return empty string - std::string string_descriptor((char *)buff, ret); + std::string string_descriptor((char *)buff, size_t(ret)); byte_vector_t string_vec(string_descriptor.begin(), string_descriptor.end()); std::string out; BOOST_FOREACH(boost::uint8_t byte, string_vec){ @@ -227,6 +239,10 @@ private: libusb_device_descriptor _desc; }; +libusb_device_descriptor_impl::~libusb_device_descriptor_impl(void){ + /* NOP */ +} + libusb::device_descriptor::sptr libusb::device_descriptor::make(device::sptr dev){ return sptr(new libusb_device_descriptor_impl(dev)); } @@ -245,13 +261,7 @@ public: UHD_ASSERT_THROW(libusb_open(_dev->get(), &_handle) == 0); } - ~libusb_device_handle_impl(void){ - //release all claimed interfaces - for (size_t i = 0; i < _claimed.size(); i++){ - libusb_release_interface(this->get(), _claimed[i]); - } - libusb_close(_handle); - } + virtual ~libusb_device_handle_impl(void); libusb_device_handle *get(void) const{ return _handle; @@ -283,6 +293,14 @@ private: std::vector<int> _claimed; }; +libusb_device_handle_impl::~libusb_device_handle_impl(void){ + //release all claimed interfaces + for (size_t i = 0; i < _claimed.size(); i++){ + libusb_release_interface(this->get(), _claimed[i]); + } + libusb_close(_handle); +} + libusb::device_handle::sptr libusb::device_handle::get_cached_handle(device::sptr dev){ static uhd::dict<libusb_device *, boost::weak_ptr<device_handle> > handles; @@ -327,6 +345,8 @@ public: _dev = dev; } + virtual ~libusb_special_handle_impl(void); + libusb::device::sptr get_device(void) const{ return _dev; } @@ -361,6 +381,10 @@ private: libusb::device::sptr _dev; //always keep a reference to device }; +libusb_special_handle_impl::~libusb_special_handle_impl(void){ + /* NOP */ +} + libusb::special_handle::sptr libusb::special_handle::make(device::sptr dev){ return sptr(new libusb_special_handle_impl(dev)); } @@ -368,6 +392,10 @@ libusb::special_handle::sptr libusb::special_handle::make(device::sptr dev){ /*********************************************************************** * list device handles implementations **********************************************************************/ +usb_device_handle::~usb_device_handle(void) { + /* NOP */ +} + std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list( boost::uint16_t vid, boost::uint16_t pid ){ diff --git a/host/lib/transport/libusb1_base.hpp b/host/lib/transport/libusb1_base.hpp index 2ff1291d9..6d104bc75 100644 --- a/host/lib/transport/libusb1_base.hpp +++ b/host/lib/transport/libusb1_base.hpp @@ -67,7 +67,7 @@ namespace libusb { public: typedef boost::shared_ptr<session> sptr; - virtual ~session(void) = 0; + virtual ~session(void); /*! * Level 0: no messages ever printed by the library (default) @@ -92,7 +92,7 @@ namespace libusb { public: typedef boost::shared_ptr<device> sptr; - virtual ~device(void) = 0; + virtual ~device(void); //! get the underlying device pointer virtual libusb_device *get(void) const = 0; @@ -106,7 +106,7 @@ namespace libusb { public: typedef boost::shared_ptr<device_list> sptr; - virtual ~device_list(void) = 0; + virtual ~device_list(void); //! make a new device list static sptr make(void); @@ -125,7 +125,7 @@ namespace libusb { public: typedef boost::shared_ptr<device_descriptor> sptr; - virtual ~device_descriptor(void) = 0; + virtual ~device_descriptor(void); //! make a new descriptor from a device reference static sptr make(device::sptr); @@ -143,7 +143,7 @@ namespace libusb { public: typedef boost::shared_ptr<device_handle> sptr; - virtual ~device_handle(void) = 0; + virtual ~device_handle(void); //! get a cached handle or make a new one given the device static sptr get_cached_handle(device::sptr); @@ -172,7 +172,7 @@ namespace libusb { public: typedef boost::shared_ptr<special_handle> sptr; - virtual ~special_handle(void) = 0; + virtual ~special_handle(void); //! make a new special handle from device static sptr make(device::sptr); diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp index 00c113163..a18f657d9 100644 --- a/host/lib/transport/libusb1_control.cpp +++ b/host/lib/transport/libusb1_control.cpp @@ -30,19 +30,21 @@ usb_control::~usb_control(void){ **********************************************************************/ class libusb_control_impl : public usb_control { public: - libusb_control_impl(libusb::device_handle::sptr handle, const size_t interface): + libusb_control_impl(libusb::device_handle::sptr handle, const int interface): _handle(handle) { _handle->claim_interface(interface); } - ssize_t submit(boost::uint8_t request_type, - boost::uint8_t request, - boost::uint16_t value, - boost::uint16_t index, - unsigned char *buff, - boost::uint16_t length, - boost::int32_t libusb_timeout = 0 + virtual ~libusb_control_impl(void); + + int submit(boost::uint8_t request_type, + boost::uint8_t request, + boost::uint16_t value, + boost::uint16_t index, + unsigned char *buff, + boost::uint16_t length, + boost::uint32_t libusb_timeout = 0 ){ boost::mutex::scoped_lock lock(_mutex); return libusb_control_transfer(_handle->get(), @@ -60,10 +62,14 @@ private: boost::mutex _mutex; }; +libusb_control_impl::~libusb_control_impl(void) { + /* NOP */ +} + /*********************************************************************** * USB control public make functions **********************************************************************/ -usb_control::sptr usb_control::make(usb_device_handle::sptr handle, const size_t interface){ +usb_control::sptr usb_control::make(usb_device_handle::sptr handle, const int interface){ return sptr(new libusb_control_impl(libusb::device_handle::get_cached_handle( boost::static_pointer_cast<libusb::special_handle>(handle)->get_device() ), interface)); diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 53e345009..c32b96b63 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -125,19 +125,21 @@ public: _ctx(libusb::session::get_global_session()->get_context()), _lut(lut), _frame_size(frame_size) { /* NOP */ } + virtual ~libusb_zero_copy_mb(void); + void release(void){ _release_cb(this); } UHD_INLINE void submit(void) { - _lut->length = (_is_recv)? _frame_size : size(); //always set length + _lut->length = int((_is_recv)? _frame_size : size()); //always set length #ifdef UHD_TXRX_DEBUG_PRINTS result.start_time = boost::get_system_time().time_of_day().total_microseconds(); result.buff_num = num(); result.is_recv = _is_recv; #endif - const int ret = libusb_submit_transfer(_lut); + int ret = libusb_submit_transfer(_lut); if (ret != LIBUSB_SUCCESS) throw uhd::usb_error(ret, str(boost::format( "usb %s submit failed: %s") % _name % libusb_error_name(ret))); @@ -152,7 +154,7 @@ public: throw uhd::io_error(str(boost::format("usb %s transfer status: %d") % _name % libusb_error_name(result.status))); result.completed = 0; - return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? result.actual_length : _frame_size); + return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? size_t(result.actual_length) : _frame_size); } return typename buffer_type::sptr(); } @@ -189,6 +191,10 @@ private: const size_t _frame_size; }; +libusb_zero_copy_mb::~libusb_zero_copy_mb(void) { + /* NOP */ +} + /*********************************************************************** * USB zero_copy device class **********************************************************************/ @@ -197,7 +203,7 @@ class libusb_zero_copy_single public: libusb_zero_copy_single( libusb::device_handle::sptr handle, - const size_t interface, const size_t endpoint, + const int interface, const unsigned char endpoint, const size_t num_frames, const size_t frame_size ): _handle(handle), @@ -221,7 +227,7 @@ public: _handle->get(), // dev_handle endpoint, // endpoint static_cast<unsigned char *>(buff), - sizeof(buff), + int(sizeof(buff)), &transfered, //bytes xfered 10 //timeout ms ); @@ -243,7 +249,7 @@ public: _handle->get(), // dev_handle endpoint, // endpoint static_cast<unsigned char *>(_buffer_pool->at(i)), // buffer - this->get_frame_size(), // length + int(this->get_frame_size()), // length libusb_transfer_cb_fn(&libusb_async_cb), // callback static_cast<void *>(&_mb_pool.back()->result), // user_data 0 // timeout (ms) @@ -372,10 +378,10 @@ struct libusb_zero_copy_impl : usb_zero_copy { libusb_zero_copy_impl( libusb::device_handle::sptr handle, - const size_t recv_interface, - const size_t recv_endpoint, - const size_t send_interface, - const size_t send_endpoint, + const int recv_interface, + const unsigned char recv_endpoint, + const int send_interface, + const unsigned char send_endpoint, const device_addr_t &hints ){ _recv_impl.reset(new libusb_zero_copy_single( @@ -388,6 +394,8 @@ struct libusb_zero_copy_impl : usb_zero_copy size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE)))); } + virtual ~libusb_zero_copy_impl(void); + managed_recv_buffer::sptr get_recv_buff(double timeout) { boost::mutex::scoped_lock l(_recv_mutex); @@ -410,15 +418,26 @@ struct libusb_zero_copy_impl : usb_zero_copy boost::mutex _recv_mutex, _send_mutex; }; +libusb_zero_copy_impl::~libusb_zero_copy_impl(void) { + /* NOP */ +} + +/*********************************************************************** + * USB zero_copy destructor + **********************************************************************/ +usb_zero_copy::~usb_zero_copy(void) { + /* NOP */ +} + /*********************************************************************** * USB zero_copy make functions **********************************************************************/ usb_zero_copy::sptr usb_zero_copy::make( usb_device_handle::sptr handle, - const size_t recv_interface, - const size_t recv_endpoint, - const size_t send_interface, - const size_t send_endpoint, + const int recv_interface, + const unsigned char recv_endpoint, + const int send_interface, + const unsigned char send_endpoint, const device_addr_t &hints ){ libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp new file mode 100644 index 000000000..996db3c98 --- /dev/null +++ b/host/lib/transport/muxed_zero_copy_if.cpp @@ -0,0 +1,250 @@ +// +// Copyright 2016 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/muxed_zero_copy_if.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/safe_call.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread.hpp> +#include <boost/thread/locks.hpp> +#include <map> + +using namespace uhd; +using namespace uhd::transport; + +class muxed_zero_copy_if_impl : public muxed_zero_copy_if, + public boost::enable_shared_from_this<muxed_zero_copy_if_impl> +{ +public: + typedef boost::shared_ptr<muxed_zero_copy_if_impl> sptr; + + muxed_zero_copy_if_impl( + zero_copy_if::sptr base_xport, + stream_classifier_fn classify_fn, + size_t max_streams + ): + _base_xport(base_xport), _classify(classify_fn), + _max_num_streams(max_streams), _num_dropped_frames(0) + { + //Create the receive thread to poll the underlying transport + //and classify packets into queues + _recv_thread = boost::thread( + boost::bind(&muxed_zero_copy_if_impl::_update_queues, this)); + } + + virtual ~muxed_zero_copy_if_impl() + { + UHD_SAFE_CALL( + //Interrupt buffer updater loop + _recv_thread.interrupt(); + //Wait for loop to finish + //No timeout on join. The recv loop is guaranteed + //to terminate in a reasonable amount of time because + //there are no timed blocks on the underlying. + _recv_thread.join(); + //Flush base transport + while (_base_xport->get_recv_buff(0.0001)) /*NOP*/; + //Release child streams + //Note that this will not delete or flush the child streams + //until the owners of the streams have released the respective + //shared pointers. This ensures that packets are not dropped. + _streams.clear(); + ); + } + + virtual zero_copy_if::sptr make_stream(const uint32_t stream_num) + { + boost::lock_guard<boost::mutex> lock(_mutex); + if (_streams.size() >= _max_num_streams) { + throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams."); + } + stream_impl::sptr stream = boost::make_shared<stream_impl>(this->shared_from_this(), stream_num); + _streams[stream_num] = stream; + return stream; + } + + virtual size_t get_num_dropped_frames() const + { + return _num_dropped_frames; + } + + void remove_stream(const uint32_t stream_num) + { + boost::lock_guard<boost::mutex> lock(_mutex); + _streams.erase(stream_num); + } + +private: + class stream_impl : public zero_copy_if + { + public: + typedef boost::shared_ptr<stream_impl> sptr; + typedef boost::weak_ptr<stream_impl> wptr; + + stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num): + _stream_num(stream_num), _muxed_xport(muxed_xport), + _buff_queue(muxed_xport->base_xport()->get_num_recv_frames()) + { + } + + ~stream_impl(void) + { + //First remove the stream from muxed transport + //so no more frames are pushed in + _muxed_xport->remove_stream(_stream_num); + //Flush the transport + managed_recv_buffer::sptr buff; + while (_buff_queue.pop_with_haste(buff)) { + //NOP + } + } + + size_t get_num_recv_frames(void) const { + return _muxed_xport->base_xport()->get_num_recv_frames(); + } + + size_t get_recv_frame_size(void) const { + return _muxed_xport->base_xport()->get_recv_frame_size(); + } + + managed_recv_buffer::sptr get_recv_buff(double timeout) { + managed_recv_buffer::sptr buff; + if (_buff_queue.pop_with_timed_wait(buff, timeout)) { + return buff; + } else { + return managed_recv_buffer::sptr(); + } + + } + + void push_recv_buff(managed_recv_buffer::sptr buff) { + _buff_queue.push_with_wait(buff); + } + + size_t get_num_send_frames(void) const { + return _muxed_xport->base_xport()->get_num_send_frames(); + } + + size_t get_send_frame_size(void) const { + return _muxed_xport->base_xport()->get_send_frame_size(); + } + + managed_send_buffer::sptr get_send_buff(double timeout) + { + return _muxed_xport->base_xport()->get_send_buff(timeout); + } + + private: + const uint32_t _stream_num; + muxed_zero_copy_if_impl::sptr _muxed_xport; + bounded_buffer<managed_recv_buffer::sptr> _buff_queue; + }; + + inline zero_copy_if::sptr& base_xport() { return _base_xport; } + + void _update_queues() + { + //Run forever: + // - Pull packets from the base transport + // - Classify them + // - Push them to the appropriate receive queue + while (true) { + { //Uninterruptable block of code + boost::this_thread::disable_interruption interrupt_disabler; + if (not _process_next_buffer()) { + //Be a good citizen and yield if no packet is processed + static const size_t MIN_DUR = 1; + boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR)); + //We call sleep(MIN_DUR) above instead of yield() to ensure that we + //relinquish the current scheduler time slot. + //yield() is a hint to the scheduler to end the time + //slice early and schedule in another thread that is ready to run. + //However in most situations, there will be no other thread and + //this thread will continue to run which will rail a CPU core. + //We call sleep(MIN_DUR=1) instead which will sleep for a minimum time. + //Ideally we would like to use boost::chrono::.*seconds::min() but that + //is bound to 0, which causes the sleep_for call to be a no-op and + //thus useless to actually force a sleep. + + //**************************************************************** + //NOTE: This behavior makes this transport a poor choice for + // low latency communication. + //**************************************************************** + } + } + //Check if the master thread has requested a shutdown + if (boost::this_thread::interruption_requested()) break; + } + } + + bool _process_next_buffer() + { + managed_recv_buffer::sptr buff = _base_xport->get_recv_buff(0.0); + if (buff) { + stream_impl::sptr stream; + try { + const uint32_t stream_num = _classify(buff->cast<void*>(), _base_xport->get_recv_frame_size()); + { + //Hold the stream mutex long enough to pull a bounded buffer + //and lock it (increment its ref count). + boost::lock_guard<boost::mutex> lock(_mutex); + stream_map_t::iterator str_iter = _streams.find(stream_num); + if (str_iter != _streams.end()) { + stream = (*str_iter).second.lock(); + } + } + } catch (std::exception&) { + //If _classify throws we simply drop the frame + } + //Once a bounded buffer is acquired, we can rely on its + //thread safety to serialize with the consumer. + if (stream.get()) { + stream->push_recv_buff(buff); + } else { + boost::lock_guard<boost::mutex> lock(_mutex); + _num_dropped_frames++; + } + //We processed a packet, and there could be more coming + //Don't yield in the next iteration. + return true; + } else { + //The base transport is idle. Return false to let the + //thread yield. + return false; + } + } + + typedef std::map<uint32_t, stream_impl::wptr> stream_map_t; + + zero_copy_if::sptr _base_xport; + stream_classifier_fn _classify; + stream_map_t _streams; + const size_t _max_num_streams; + size_t _num_dropped_frames; + boost::thread _recv_thread; + boost::mutex _mutex; +}; + +muxed_zero_copy_if::sptr muxed_zero_copy_if::make( + zero_copy_if::sptr base_xport, + muxed_zero_copy_if::stream_classifier_fn classify_fn, + size_t max_streams +) { + return boost::make_shared<muxed_zero_copy_if_impl>(base_xport, classify_fn, max_streams); +} diff --git a/host/lib/transport/nirio/lvbitx/CMakeLists.txt b/host/lib/transport/nirio/lvbitx/CMakeLists.txt index b9a2a9f15..5741a12f8 100644 --- a/host/lib/transport/nirio/lvbitx/CMakeLists.txt +++ b/host/lib/transport/nirio/lvbitx/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2013 Ettus Research LLC +# Copyright 2013,2015 Ettus Research LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -30,8 +30,8 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile) SET(IMAGES_PATH_OPT --uhd-images-path=${UHD_IMAGES_DIR}) ADD_CUSTOM_COMMAND( - OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp + OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/process-lvbitx.py DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.hpp DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.cpp @@ -41,6 +41,7 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile) ) #make libuhd depend on the output file + LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp) LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp) ENDMACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM) diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp index bbaf9f235..3d62b57ae 100644 --- a/host/lib/transport/nirio/rpc/rpc_client.cpp +++ b/host/lib/transport/nirio/rpc/rpc_client.cpp @@ -1,5 +1,5 @@ /// -// Copyright 2013 Ettus Research LLC +// Copyright 2013,2016 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -55,22 +55,7 @@ rpc_client::rpc_client ( tcp::resolver::query::flags query_flags(tcp::resolver::query::passive); tcp::resolver::query query(tcp::v4(), server, port, query_flags); tcp::resolver::iterator iterator = resolver.resolve(query); - - #if BOOST_VERSION < 104700 - // default constructor creates end iterator - tcp::resolver::iterator end; - - boost::system::error_code error = boost::asio::error::host_not_found; - while (error && iterator != end) - { - _socket.close(); - _socket.connect(*iterator++, error); - } - if (error) - throw boost::system::system_error(error); - #else - boost::asio::connect(_socket, iterator); - #endif + boost::asio::connect(_socket, iterator); UHD_LOG << "rpc_client connected to server." << std::endl; @@ -109,11 +94,6 @@ rpc_client::rpc_client ( } catch (boost::exception&) { UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl; _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); -#if BOOST_VERSION < 104700 - } catch (std::exception& e) { - UHD_LOG << "rpc_client connection error: " << e.what() << std::endl; - _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category()); -#endif } } diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 8bfa1973a..5ca1da687 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -24,18 +24,19 @@ #include <uhd/stream.hpp> #include <uhd/utils/msg.hpp> #include <uhd/utils/tasks.hpp> -#include <uhd/utils/atomic.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> +#ifdef DEVICE3_STREAMER +# include "../rfnoc/rx_stream_terminator.hpp" +#endif #include <boost/dynamic_bitset.hpp> #include <boost/foreach.hpp> #include <boost/function.hpp> #include <boost/format.hpp> #include <boost/bind.hpp> #include <boost/make_shared.hpp> -#include <boost/thread/barrier.hpp> #include <iostream> #include <vector> @@ -92,22 +93,15 @@ public: } ~recv_packet_handler(void){ - _task_barrier.interrupt(); - _task_handlers.clear(); + /* NOP */ } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; - _task_handlers.clear(); _props.resize(size); //re-initialize all buffers infos by re-creating the vector _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size)); - _task_barrier.resize(size); - _task_handlers.resize(size); - for (size_t i = 1/*skip 0*/; i < size; i++){ - _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i)); - }; } //! Get the channel width of this handler @@ -121,13 +115,42 @@ public: _header_offset_words32 = header_offset_words32; } + ////////////////// RFNOC /////////////////////////// + //! Set the stream ID for a specific channel (or no SID) + void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){ + _props.at(xport_chan).has_sid = has_sid; + _props.at(xport_chan).sid = sid; + } + + //! Get the stream ID for a specific channel (or zero if no SID) + boost::uint32_t get_xport_chan_sid(const size_t xport_chan) const { + if (_props.at(xport_chan).has_sid) { + return _props.at(xport_chan).sid; + } else { + return 0; + } + } + + #ifdef DEVICE3_STREAMER + void set_terminator(uhd::rfnoc::rx_stream_terminator::sptr terminator) + { + _terminator = terminator; + } + + uhd::rfnoc::rx_stream_terminator::sptr get_terminator() + { + return _terminator; + } + #endif + ////////////////// RFNOC /////////////////////////// + /*! * Set the threshold for alignment failure. * How many packets throw out before giving up? * \param threshold number of packets per channel */ void set_alignment_failure_threshold(const size_t threshold){ - _alignment_faulure_threshold = threshold*this->size(); + _alignment_failure_threshold = threshold*this->size(); } //! Set the rate of ticks per second @@ -203,6 +226,13 @@ public: //! Overload call to issue stream commands void issue_stream_cmd(const stream_cmd_t &stream_cmd) { + // RFNoC: This needs to be checked by the radio block, once it's done. TODO remove this. + //if (stream_cmd.stream_now + //and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS + //and _props.size() > 1) { + //throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting."); + //} + for (size_t i = 0; i < _props.size(); i++) { if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd); @@ -234,7 +264,7 @@ public: buffs, nsamps_per_buff, metadata, timeout ); - if (one_packet){ + if (one_packet or metadata.end_of_burst){ #ifdef UHD_TXRX_DEBUG_PRINTS dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); #endif @@ -242,7 +272,9 @@ public: } //first recv had an error code set, return immediately - if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) { + return accum_num_samps; + } //loop until buffer is filled or error code while(accum_num_samps < nsamps_per_buff){ @@ -256,10 +288,16 @@ public: _queue_error_for_next_call = true; break; } + accum_num_samps += num_samps; + + //return immediately if end of burst + if (_queue_metadata.end_of_burst) { + break; + } } #ifdef UHD_TXRX_DEBUG_PRINTS - dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); + dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); #endif return accum_num_samps; } @@ -269,7 +307,7 @@ private: size_t _header_offset_words32; double _tick_rate, _samp_rate; bool _queue_error_for_next_call; - size_t _alignment_faulure_threshold; + size_t _alignment_failure_threshold; rx_metadata_t _queue_metadata; struct xport_chan_props_type{ xport_chan_props_type(void): @@ -283,6 +321,10 @@ private: handle_overflow_type handle_overflow; handle_flowctrl_type handle_flowctrl; size_t fc_update_window; + /////// RFNOC /////////// + bool has_sid; + boost::uint32_t sid; + /////// RFNOC /////////// }; std::vector<xport_chan_props_type> _props; size_t _num_outputs; @@ -355,6 +397,10 @@ private: int recvd_packets; #endif + #ifdef DEVICE3_STREAMER + uhd::rfnoc::rx_stream_terminator::sptr _terminator; + #endif + /******************************************************************* * Get and process a single packet from the transport: * Receive a single packet at the given index. @@ -421,6 +467,7 @@ private: const size_t expected_packet_count = _props[index].packet_count; _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask; if (expected_packet_count != info.ifpi.packet_count){ + //UHD_MSG(status) << "expected: " << expected_packet_count << " got: " << info.ifpi.packet_count << std::endl; if (_props[index].handle_flowctrl) { // Always update flow control in this case, because we don't // know which packet was dropped and what state the upstream @@ -442,6 +489,10 @@ private: void _flush_all(double timeout) { + get_prev_buffer_info().reset(); + get_curr_buffer_info().reset(); + get_next_buffer_info().reset(); + for (size_t i = 0; i < _props.size(); i++) { per_buffer_info_type prev_buffer_info, curr_buffer_info; @@ -462,9 +513,6 @@ private: curr_buffer_info.reset(); } } - get_prev_buffer_info().reset(); - get_curr_buffer_info().reset(); - get_next_buffer_info().reset(); } /******************************************************************* @@ -562,20 +610,27 @@ private: curr_info.metadata.time_spec = next_info[index].time; curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ + // Not sending flow control would cause timeouts due to source flow control locking up. + // Send first as the overrun handler may flush the receive buffers which could contain + // packets with sequence numbers after this packet's sequence number! + if(_props[index].handle_flowctrl) { + _props[index].handle_flowctrl(next_info[index].ifpi.packet_count); + } + rx_metadata_t metadata = curr_info.metadata; _props[index].handle_overflow(); curr_info.metadata = metadata; UHD_MSG(fastpath) << "O"; - - // Not sending flow control would cause timeouts due to source flow control locking up - if(_props[index].handle_flowctrl) { - _props[index].handle_flowctrl(next_info[index].ifpi.packet_count); - } } + curr_info[index].buff.reset(); + curr_info[index].copy_buff = NULL; return; case PACKET_TIMEOUT_ERROR: std::swap(curr_info, next_info); //save progress from curr -> next + if(_props[index].handle_flowctrl) { + _props[index].handle_flowctrl(next_info[index].ifpi.packet_count); + } curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; return; @@ -593,7 +648,7 @@ private: } //too many iterations: detect alignment failure - if (iterations++ > _alignment_faulure_threshold){ + if (iterations++ > _alignment_failure_threshold){ UHD_MSG(error) << boost::format( "The receive packet handler failed to time-align packets.\n" "%u received packets were processed by the handler.\n" @@ -657,7 +712,9 @@ private: _convert_bytes_to_copy = bytes_to_copy; //perform N channels of conversion - converter_thread_task(0); + for (size_t i = 0; i < this->size(); i++) { + convert_to_out_buff(i); + } //update the copy buffer's availability info.data_bytes_to_copy -= bytes_to_copy; @@ -670,15 +727,15 @@ private: return nsamps_to_copy_per_io_buff; } - /******************************************************************* - * Perform one thread's work of the conversion task. - * The entry and exit use a dual synchronization barrier, - * to wait for data to become ready and block until completion. - ******************************************************************/ - UHD_INLINE void converter_thread_task(const size_t index) + /*! Run the conversion from the internal buffers to the user's output + * buffer. + * + * - Calls the converter + * - Releases internal data buffers + * - Updates read/write pointers + */ + inline void convert_to_out_buff(const size_t index) { - _task_barrier.wait(); - //shortcut references to local data structures buffers_info_type &buff_info = get_curr_buffer_info(); per_buffer_info_type &info = buff_info[index]; @@ -702,13 +759,9 @@ private: if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){ info.buff.reset(); //effectively a release } - - if (index == 0) _task_barrier.wait_others(); } //! Shared variables for the worker threads - reusable_barrier _task_barrier; - std::vector<task::sptr> _task_handlers; size_t _convert_nsamps; const rx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index c2810842e..5e81fb442 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -24,11 +24,14 @@ #include <uhd/stream.hpp> #include <uhd/utils/msg.hpp> #include <uhd/utils/tasks.hpp> -#include <uhd/utils/atomic.hpp> #include <uhd/utils/byteswap.hpp> #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> +#ifdef DEVICE3_STREAMER +# include "../rfnoc/tx_stream_terminator.hpp" +#endif +#include <boost/thread/thread.hpp> #include <boost/thread/thread_time.hpp> #include <boost/foreach.hpp> #include <boost/function.hpp> @@ -74,22 +77,15 @@ public: } ~send_packet_handler(void){ - _task_barrier.interrupt(); - _task_handlers.clear(); + /* NOP */ } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; - _task_handlers.clear(); _props.resize(size); static const boost::uint64_t zero = 0; _zero_buffs.resize(size, &zero); - _task_barrier.resize(size); - _task_handlers.resize(size); - for (size_t i = 1/*skip 0*/; i < size; i++){ - _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i)); - }; } //! Get the channel width of this handler @@ -109,6 +105,29 @@ public: _props.at(xport_chan).sid = sid; } + ///////// RFNOC /////////////////// + //! Get the stream ID for a specific channel (or zero if no SID) + boost::uint32_t get_xport_chan_sid(const size_t xport_chan) const { + if (_props.at(xport_chan).has_sid) { + return _props.at(xport_chan).sid; + } else { + return 0; + } + } + + #ifdef DEVICE3_STREAMER + void set_terminator(uhd::rfnoc::tx_stream_terminator::sptr terminator) + { + _terminator = terminator; + } + + uhd::rfnoc::tx_stream_terminator::sptr get_terminator() + { + return _terminator; + } + #endif + ///////// RFNOC /////////////////// + void set_enable_trailer(const bool enable) { _has_tlr = enable; @@ -303,6 +322,10 @@ private: bool _cached_metadata; uhd::tx_metadata_t _metadata_cache; + #ifdef DEVICE3_STREAMER + uhd::rfnoc::tx_stream_terminator::sptr _terminator; + #endif + #ifdef UHD_TXRX_DEBUG_PRINTS struct dbg_send_stat_t { dbg_send_stat_t(long wc, size_t nspb, size_t nss, uhd::tx_metadata_t md, double to, double rate): @@ -377,21 +400,23 @@ private: _convert_if_packet_info = &if_packet_info; //perform N channels of conversion - converter_thread_task(0); + for (size_t i = 0; i < this->size(); i++) { + convert_to_in_buff(i); + } _next_packet_seq++; //increment sequence after commits return nsamps_per_buff; } - /******************************************************************* - * Perform one thread's work of the conversion task. - * The entry and exit use a dual synchronization barrier, - * to wait for data to become ready and block until completion. - ******************************************************************/ - UHD_INLINE void converter_thread_task(const size_t index) + /*! Run the conversion from the internal buffers to the user's input + * buffer. + * + * - Calls the converter + * - Releases internal data buffers + * - Updates read/write pointers + */ + UHD_INLINE void convert_to_in_buff(const size_t index) { - _task_barrier.wait(); - //shortcut references to local data structures managed_send_buffer::sptr &buff = _props[index].buff; vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; @@ -419,13 +444,9 @@ private: const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; buff->commit(num_vita_words32*sizeof(boost::uint32_t)); buff.reset(); //effectively a release - - if (index == 0) _task_barrier.wait_others(); } //! Shared variables for the worker threads - reusable_barrier _task_barrier; - std::vector<task::sptr> _task_handlers; size_t _convert_nsamps; const tx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; diff --git a/host/lib/transport/usb_dummy_impl.cpp b/host/lib/transport/usb_dummy_impl.cpp index ce8c306e4..b53b6f590 100644 --- a/host/lib/transport/usb_dummy_impl.cpp +++ b/host/lib/transport/usb_dummy_impl.cpp @@ -31,12 +31,20 @@ std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(boost::u return std::vector<usb_device_handle::sptr>(); //empty list } -usb_control::sptr usb_control::make(usb_device_handle::sptr, const size_t){ +usb_control::sptr usb_control::make( + usb_device_handle::sptr, + const int +) { throw uhd::not_implemented_error("no usb support -> usb_control::make not implemented"); } usb_zero_copy::sptr usb_zero_copy::make( - usb_device_handle::sptr, const size_t, const size_t, const size_t, const size_t, const device_addr_t & + usb_device_handle::sptr, + const int, + const unsigned char, + const int, + const unsigned char, + const device_addr_t & ){ throw uhd::not_implemented_error("no usb support -> usb_zero_copy::make not implemented"); } diff --git a/host/lib/transport/zero_copy_recv_offload.cpp b/host/lib/transport/zero_copy_recv_offload.cpp new file mode 100644 index 000000000..e8b013abc --- /dev/null +++ b/host/lib/transport/zero_copy_recv_offload.cpp @@ -0,0 +1,158 @@ +// +// Copyright 2016 Ettus Research +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/zero_copy_recv_offload.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/safe_call.hpp> +#include <boost/format.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/thread.hpp> +#include <boost/bind.hpp> + +using namespace uhd; +using namespace uhd::transport; + +typedef bounded_buffer<managed_recv_buffer::sptr> bounded_buffer_t; + +/*********************************************************************** + * Zero copy offload transport: + * An intermediate transport that utilizes threading to free + * the main thread from any receive work. + **********************************************************************/ +class zero_copy_recv_offload_impl : public zero_copy_recv_offload { +public: + typedef boost::shared_ptr<zero_copy_recv_offload_impl> sptr; + + zero_copy_recv_offload_impl(zero_copy_if::sptr transport, + const double timeout) : + _transport(transport), _timeout(timeout), + _inbox(transport->get_num_recv_frames()), + _recv_done(false) + { + UHD_LOG << "Created threaded transport" << std::endl; + + // Create the receive and send threads to offload + // the system calls onto other threads + _recv_thread = boost::thread( + boost::bind(&zero_copy_recv_offload_impl::enqueue_recv, this) + ); + } + + // Receive thread flags + void set_recv_done() + { + boost::lock_guard<boost::mutex> guard(_recv_mutex); + _recv_done = true; + } + + bool is_recv_done() + { + boost::lock_guard<boost::mutex> guard(_recv_mutex); + return _recv_done; + } + + ~zero_copy_recv_offload_impl() + { + // Signal the threads we're finished + set_recv_done(); + + // Wait for them to join + UHD_SAFE_CALL( + _recv_thread.join(); + ) + } + + // The receive thread function is responsible for + // pulling pointers to managed receiver buffers quickly + void enqueue_recv() + { + while (not is_recv_done()) { + managed_recv_buffer::sptr buff = _transport->get_recv_buff(_timeout); + if (not buff) continue; + _inbox.push_with_timed_wait(buff, _timeout); + } + } + + /******************************************************************* + * Receive implementation: + * Pop the receive buffer pointer from the underlying transport + ******************************************************************/ + managed_recv_buffer::sptr get_recv_buff(double timeout) + { + managed_recv_buffer::sptr ptr; + _inbox.pop_with_timed_wait(ptr, timeout); + return ptr; + } + + size_t get_num_recv_frames() const + { + return _transport->get_num_recv_frames(); + } + + size_t get_recv_frame_size() const + { + return _transport->get_recv_frame_size(); + } + + /******************************************************************* + * Send implementation: + * Pass the send buffer pointer from the underlying transport + ******************************************************************/ + managed_send_buffer::sptr get_send_buff(double timeout) + { + return _transport->get_send_buff(timeout); + } + + size_t get_num_send_frames() const + { + return _transport->get_num_send_frames(); + } + + size_t get_send_frame_size() const + { + return _transport->get_send_frame_size(); + } + +private: + // The linked transport + zero_copy_if::sptr _transport; + + const double _timeout; + + // Shared buffers + bounded_buffer_t _inbox; + + // Threading + bool _recv_done; + boost::thread _recv_thread; + boost::mutex _recv_mutex; +}; + +zero_copy_recv_offload::sptr zero_copy_recv_offload::make( + zero_copy_if::sptr transport, + const double timeout) +{ + zero_copy_recv_offload_impl::sptr zero_copy_recv_offload( + new zero_copy_recv_offload_impl(transport, timeout) + ); + + return zero_copy_recv_offload; +} |