diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/libusb1_base.cpp | 2 | ||||
-rw-r--r-- | host/lib/transport/libusb1_control.cpp | 3 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 34 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 113 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 11 |
5 files changed, 57 insertions, 106 deletions
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index 6d4df7875..9b4290c08 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -206,7 +206,7 @@ libusb::device_handle::sptr libusb::device_handle::get_cached_handle(device::spt handles[dev->get()] = new_handle; return new_handle; } - catch(const uhd::exception &e){ + catch(const uhd::exception &){ std::cerr << "USB open failed: see the application notes for your device." << std::endl; throw; } diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp index f903907d0..bce3d4b0b 100644 --- a/host/lib/transport/libusb1_control.cpp +++ b/host/lib/transport/libusb1_control.cpp @@ -17,6 +17,7 @@ #include "libusb1_base.hpp" #include <uhd/transport/usb_control.hpp> +#include <boost/thread/mutex.hpp> using namespace uhd::transport; @@ -40,6 +41,7 @@ public: unsigned char *buff, boost::uint16_t length ){ + boost::mutex::scoped_lock lock(_mutex); return libusb_control_transfer(_handle->get(), request_type, request, @@ -52,6 +54,7 @@ public: private: libusb::device_handle::sptr _handle; + boost::mutex _mutex; }; /*********************************************************************** diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f781f890d..0fa856d34 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -21,11 +21,11 @@ #include <uhd/transport/buffer_pool.hpp> #include <uhd/utils/thread_priority.hpp> #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp> #include <uhd/exception.hpp> #include <boost/function.hpp> #include <boost/foreach.hpp> #include <boost/thread/thread.hpp> -#include <boost/thread/barrier.hpp> #include <list> using namespace uhd; @@ -202,12 +202,10 @@ public: } //spawn the event handler threads - size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); - boost::barrier spawn_barrier(concurrency+1); - for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( - boost::bind(&libusb_zero_copy_impl::run_event_loop, this, boost::ref(spawn_barrier)) - ); - spawn_barrier.wait(); + const size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); + for (size_t i = 0; i < concurrency; i++) _event_loop_tasks.push_back(task::make( + boost::bind(&libusb_zero_copy_impl::run_event_loop, this) + )); } ~libusb_zero_copy_impl(void){ @@ -221,9 +219,6 @@ public: boost::this_thread::sleep(boost::posix_time::milliseconds(10)); } } - //shutdown the threads - _thread_group.interrupt_all(); - _thread_group.join_all(); } managed_recv_buffer::sptr get_recv_buff(double timeout){ @@ -275,20 +270,17 @@ private: std::list<libusb_transfer *> _all_luts; //! event handler threads - boost::thread_group _thread_group; + std::list<task::sptr> _event_loop_tasks; - void run_event_loop(boost::barrier &spawn_barrier){ - spawn_barrier.wait(); + void run_event_loop(void){ set_thread_priority_safe(); libusb_context *context = libusb::session::get_global_session()->get_context(); - try{ - while (not boost::this_thread::interruption_requested()){ - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 100000; //100ms - libusb_handle_events_timeout(context, &tv); - } - } catch(const boost::thread_interrupted &){} + while (not boost::this_thread::interruption_requested()){ + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100000; //100ms + libusb_handle_events_timeout(context, &tv); + } } }; diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 80ad17b6c..15bd78242 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -30,6 +30,7 @@ #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> #include <boost/thread/mutex.hpp> +#include <boost/dynamic_bitset.hpp> #include <boost/foreach.hpp> #include <boost/function.hpp> #include <boost/format.hpp> @@ -51,61 +52,6 @@ typedef boost::function<void(void)> handle_overflow_type; static inline void handle_overflow_nop(void){} /*********************************************************************** - * Alignment indexes class: - * - Access an integer set with very quick operations. - **********************************************************************/ -class alignment_indexes{ -public: - typedef boost::uint16_t index_type; //16 buffers - - alignment_indexes(void): - _indexes(0), - _sizes(256, 0), - _fronts(256, ~0) - { - //fill the O(1) look up tables for a single byte - for (size_t i = 0; i < 256; i++){ - for (size_t j = 0; j < 8; j++){ - if (i & (1 << j)){ - _sizes[i]++; - _fronts[i] = j; - } - } - } - } - - UHD_INLINE void reset(size_t len){_indexes = (1 << len) - 1;} - - UHD_INLINE size_t front(void){ - //check one byte per iteration - for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ - size_t front = _fronts[(_indexes >> i) & 0xff]; - if (front != size_t(~0)) return front + i; - } - if (empty()) throw uhd::runtime_error("cannot call front() when empty"); - UHD_THROW_INVALID_CODE_PATH(); - } - - UHD_INLINE void remove(size_t index){_indexes &= ~(1 << index);} - - UHD_INLINE bool empty(void){return _indexes == 0;} - - UHD_INLINE size_t size(void){ - size_t size = 0; - //check one byte per iteration - for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ - size += _sizes[(_indexes >> i) & 0xff]; - } - return size; - } - -private: - index_type _indexes; - std::vector<size_t> _sizes; - std::vector<size_t> _fronts; -}; - -/*********************************************************************** * Super receive packet handler * * A receive packet handler represents a group of channels. @@ -126,9 +72,9 @@ public: _queue_error_for_next_call(false), _buffers_infos_index(0) { - UHD_ASSERT_THROW(size <= sizeof(alignment_indexes::index_type)*8); this->resize(size); set_alignment_failure_threshold(1000); + this->set_scale_factor(1/32767.); } //! Resize the number of transport channels @@ -208,6 +154,11 @@ public: return boost::mutex::scoped_lock(_mutex); } + //! Set the scale factor used in float conversion + void set_scale_factor(const double scale_factor){ + _scale_factor = scale_factor; + } + /******************************************************************* * Receive: * The entry point for the fast-path receive calls. @@ -293,6 +244,7 @@ private: std::vector<void *> _io_buffs; //used in conversion size_t _bytes_per_item; //used in conversion std::vector<uhd::convert::function_type> _converters; //used in conversion + double _scale_factor; //! information stored for a received buffer struct per_buffer_info_type{ @@ -307,13 +259,12 @@ private: struct buffers_info_type : std::vector<per_buffer_info_type> { buffers_info_type(const size_t size): std::vector<per_buffer_info_type>(size), + indexes_todo(size, true), alignment_time_valid(false), data_bytes_to_copy(0), fragment_offset_in_samps(0) - { - indexes_to_do.reset(size); - } - alignment_indexes indexes_to_do; //used in alignment logic + {/* NOP */} + boost::dynamic_bitset<> indexes_todo; //used in alignment logic time_spec_t alignment_time; //used in alignment logic bool alignment_time_valid; //used in alignment logic size_t data_bytes_to_copy; //keeps track of state @@ -369,34 +320,30 @@ private: info.time = time_spec_t(time_t(info.ifpi.tsi), size_t(info.ifpi.tsf), _tick_rate); //assumes has_tsi and has_tsf are true info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); - //store the packet count for the next iteration - #ifndef SRPH_DONT_CHECK_SEQUENCE - const size_t expected_packet_count = _props[index].packet_count; - _props[index].packet_count = (info.ifpi.packet_count + 1)%16; - #endif - //-------------------------------------------------------------- //-- Determine return conditions: //-- The order of these checks is HOLY. //-------------------------------------------------------------- - //1) check for out of order timestamps - if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ - return PACKET_TIMESTAMP_ERROR; - } - - //2) check for inline IF message packets + //1) check for inline IF message packets if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ return PACKET_INLINE_MESSAGE; } - //3) check for sequence errors + //2) check for sequence errors #ifndef SRPH_DONT_CHECK_SEQUENCE + const size_t expected_packet_count = _props[index].packet_count; + _props[index].packet_count = (info.ifpi.packet_count + 1)%16; if (expected_packet_count != info.ifpi.packet_count){ return PACKET_SEQUENCE_ERROR; } #endif + //3) check for out of order timestamps + if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ + return PACKET_TIMESTAMP_ERROR; + } + //4) otherwise the packet is normal! return PACKET_IF_DATA; } @@ -414,15 +361,15 @@ private: if (not info.alignment_time_valid or info[index].time > info.alignment_time){ info.alignment_time_valid = true; info.alignment_time = info[index].time; - info.indexes_to_do.reset(this->size()); - info.indexes_to_do.remove(index); + info.indexes_todo.set(); + info.indexes_todo.reset(index); info.data_bytes_to_copy = info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t); } //if the sequence id matches: // remove this index from the list and continue else if (info[index].time == info.alignment_time){ - info.indexes_to_do.remove(index); + info.indexes_todo.reset(index); } //if the sequence id is older: @@ -448,10 +395,10 @@ private: // - Handle the packet type yielded by the receive. // - Check the timestamps for alignment conditions. size_t iterations = 0; - while (not curr_info.indexes_to_do.empty()){ + while (curr_info.indexes_todo.any()){ //get the index to process for this iteration - const size_t index = curr_info.indexes_to_do.front(); + const size_t index = curr_info.indexes_todo.find_first(); packet_type packet; //receive a single packet from the transport @@ -502,8 +449,10 @@ private: curr_info.metadata.start_of_burst = false; curr_info.metadata.end_of_burst = false; curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); - if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW) _props[index].handle_overflow(); - UHD_MSG(fastpath) << "O"; + if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){ + _props[index].handle_overflow(); + UHD_MSG(fastpath) << "O"; + } return; case PACKET_TIMEOUT_ERROR: @@ -589,7 +538,7 @@ private: //reset current buffer info members for reuse get_curr_buffer_info().fragment_offset_in_samps = 0; get_curr_buffer_info().alignment_time_valid = false; - get_curr_buffer_info().indexes_to_do.reset(this->size()); + get_curr_buffer_info().indexes_todo.set(); //perform receive with alignment logic get_aligned_buffs(timeout); @@ -616,7 +565,7 @@ private: } //copy-convert the samples from the recv buffer - _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, 1/32767.); + _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, _scale_factor); //update the rx copy buffer to reflect the bytes copied buff_info.copy_buff += bytes_to_copy; diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 8ebc264ef..d5d9e6fe3 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -58,6 +58,7 @@ public: _next_packet_seq(0) { this->resize(size); + this->set_scale_factor(32767.); } //! Resize the number of transport channels @@ -132,6 +133,11 @@ public: return boost::mutex::scoped_lock(_mutex); } + //! Set the scale factor used in float conversion + void set_scale_factor(const double scale_factor){ + _scale_factor = scale_factor; + } + /******************************************************************* * Send: * The entry point for the fast-path send calls. @@ -238,11 +244,12 @@ private: size_t _max_samples_per_packet; std::vector<const void *> _zero_buffs; size_t _next_packet_seq; + double _scale_factor; /******************************************************************* * Send a single packet: ******************************************************************/ - size_t send_one_packet( + UHD_INLINE size_t send_one_packet( const uhd::device::send_buffs_type &buffs, const size_t nsamps_per_buff, vrt::if_packet_info_t &if_packet_info, @@ -270,7 +277,7 @@ private: otw_mem += if_packet_info.num_header_words32; //copy-convert the samples into the send buffer - _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, 32767.); + _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, _scale_factor); //commit the samples to the zero-copy interface size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); |