aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/libusb1_base.cpp2
-rw-r--r--host/lib/transport/libusb1_control.cpp3
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp34
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp113
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp11
5 files changed, 57 insertions, 106 deletions
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp
index 6d4df7875..9b4290c08 100644
--- a/host/lib/transport/libusb1_base.cpp
+++ b/host/lib/transport/libusb1_base.cpp
@@ -206,7 +206,7 @@ libusb::device_handle::sptr libusb::device_handle::get_cached_handle(device::spt
handles[dev->get()] = new_handle;
return new_handle;
}
- catch(const uhd::exception &e){
+ catch(const uhd::exception &){
std::cerr << "USB open failed: see the application notes for your device." << std::endl;
throw;
}
diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp
index f903907d0..bce3d4b0b 100644
--- a/host/lib/transport/libusb1_control.cpp
+++ b/host/lib/transport/libusb1_control.cpp
@@ -17,6 +17,7 @@
#include "libusb1_base.hpp"
#include <uhd/transport/usb_control.hpp>
+#include <boost/thread/mutex.hpp>
using namespace uhd::transport;
@@ -40,6 +41,7 @@ public:
unsigned char *buff,
boost::uint16_t length
){
+ boost::mutex::scoped_lock lock(_mutex);
return libusb_control_transfer(_handle->get(),
request_type,
request,
@@ -52,6 +54,7 @@ public:
private:
libusb::device_handle::sptr _handle;
+ boost::mutex _mutex;
};
/***********************************************************************
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index f781f890d..0fa856d34 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -21,11 +21,11 @@
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/exception.hpp>
#include <boost/function.hpp>
#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp>
-#include <boost/thread/barrier.hpp>
#include <list>
using namespace uhd;
@@ -202,12 +202,10 @@ public:
}
//spawn the event handler threads
- size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
- boost::barrier spawn_barrier(concurrency+1);
- for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread(
- boost::bind(&libusb_zero_copy_impl::run_event_loop, this, boost::ref(spawn_barrier))
- );
- spawn_barrier.wait();
+ const size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
+ for (size_t i = 0; i < concurrency; i++) _event_loop_tasks.push_back(task::make(
+ boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
+ ));
}
~libusb_zero_copy_impl(void){
@@ -221,9 +219,6 @@ public:
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}
}
- //shutdown the threads
- _thread_group.interrupt_all();
- _thread_group.join_all();
}
managed_recv_buffer::sptr get_recv_buff(double timeout){
@@ -275,20 +270,17 @@ private:
std::list<libusb_transfer *> _all_luts;
//! event handler threads
- boost::thread_group _thread_group;
+ std::list<task::sptr> _event_loop_tasks;
- void run_event_loop(boost::barrier &spawn_barrier){
- spawn_barrier.wait();
+ void run_event_loop(void){
set_thread_priority_safe();
libusb_context *context = libusb::session::get_global_session()->get_context();
- try{
- while (not boost::this_thread::interruption_requested()){
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 100000; //100ms
- libusb_handle_events_timeout(context, &tv);
- }
- } catch(const boost::thread_interrupted &){}
+ while (not boost::this_thread::interruption_requested()){
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000; //100ms
+ libusb_handle_events_timeout(context, &tv);
+ }
}
};
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 80ad17b6c..15bd78242 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -30,6 +30,7 @@
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
#include <boost/thread/mutex.hpp>
+#include <boost/dynamic_bitset.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/format.hpp>
@@ -51,61 +52,6 @@ typedef boost::function<void(void)> handle_overflow_type;
static inline void handle_overflow_nop(void){}
/***********************************************************************
- * Alignment indexes class:
- * - Access an integer set with very quick operations.
- **********************************************************************/
-class alignment_indexes{
-public:
- typedef boost::uint16_t index_type; //16 buffers
-
- alignment_indexes(void):
- _indexes(0),
- _sizes(256, 0),
- _fronts(256, ~0)
- {
- //fill the O(1) look up tables for a single byte
- for (size_t i = 0; i < 256; i++){
- for (size_t j = 0; j < 8; j++){
- if (i & (1 << j)){
- _sizes[i]++;
- _fronts[i] = j;
- }
- }
- }
- }
-
- UHD_INLINE void reset(size_t len){_indexes = (1 << len) - 1;}
-
- UHD_INLINE size_t front(void){
- //check one byte per iteration
- for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){
- size_t front = _fronts[(_indexes >> i) & 0xff];
- if (front != size_t(~0)) return front + i;
- }
- if (empty()) throw uhd::runtime_error("cannot call front() when empty");
- UHD_THROW_INVALID_CODE_PATH();
- }
-
- UHD_INLINE void remove(size_t index){_indexes &= ~(1 << index);}
-
- UHD_INLINE bool empty(void){return _indexes == 0;}
-
- UHD_INLINE size_t size(void){
- size_t size = 0;
- //check one byte per iteration
- for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){
- size += _sizes[(_indexes >> i) & 0xff];
- }
- return size;
- }
-
-private:
- index_type _indexes;
- std::vector<size_t> _sizes;
- std::vector<size_t> _fronts;
-};
-
-/***********************************************************************
* Super receive packet handler
*
* A receive packet handler represents a group of channels.
@@ -126,9 +72,9 @@ public:
_queue_error_for_next_call(false),
_buffers_infos_index(0)
{
- UHD_ASSERT_THROW(size <= sizeof(alignment_indexes::index_type)*8);
this->resize(size);
set_alignment_failure_threshold(1000);
+ this->set_scale_factor(1/32767.);
}
//! Resize the number of transport channels
@@ -208,6 +154,11 @@ public:
return boost::mutex::scoped_lock(_mutex);
}
+ //! Set the scale factor used in float conversion
+ void set_scale_factor(const double scale_factor){
+ _scale_factor = scale_factor;
+ }
+
/*******************************************************************
* Receive:
* The entry point for the fast-path receive calls.
@@ -293,6 +244,7 @@ private:
std::vector<void *> _io_buffs; //used in conversion
size_t _bytes_per_item; //used in conversion
std::vector<uhd::convert::function_type> _converters; //used in conversion
+ double _scale_factor;
//! information stored for a received buffer
struct per_buffer_info_type{
@@ -307,13 +259,12 @@ private:
struct buffers_info_type : std::vector<per_buffer_info_type> {
buffers_info_type(const size_t size):
std::vector<per_buffer_info_type>(size),
+ indexes_todo(size, true),
alignment_time_valid(false),
data_bytes_to_copy(0),
fragment_offset_in_samps(0)
- {
- indexes_to_do.reset(size);
- }
- alignment_indexes indexes_to_do; //used in alignment logic
+ {/* NOP */}
+ boost::dynamic_bitset<> indexes_todo; //used in alignment logic
time_spec_t alignment_time; //used in alignment logic
bool alignment_time_valid; //used in alignment logic
size_t data_bytes_to_copy; //keeps track of state
@@ -369,34 +320,30 @@ private:
info.time = time_spec_t(time_t(info.ifpi.tsi), size_t(info.ifpi.tsf), _tick_rate); //assumes has_tsi and has_tsf are true
info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
- //store the packet count for the next iteration
- #ifndef SRPH_DONT_CHECK_SEQUENCE
- const size_t expected_packet_count = _props[index].packet_count;
- _props[index].packet_count = (info.ifpi.packet_count + 1)%16;
- #endif
-
//--------------------------------------------------------------
//-- Determine return conditions:
//-- The order of these checks is HOLY.
//--------------------------------------------------------------
- //1) check for out of order timestamps
- if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){
- return PACKET_TIMESTAMP_ERROR;
- }
-
- //2) check for inline IF message packets
+ //1) check for inline IF message packets
if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
return PACKET_INLINE_MESSAGE;
}
- //3) check for sequence errors
+ //2) check for sequence errors
#ifndef SRPH_DONT_CHECK_SEQUENCE
+ const size_t expected_packet_count = _props[index].packet_count;
+ _props[index].packet_count = (info.ifpi.packet_count + 1)%16;
if (expected_packet_count != info.ifpi.packet_count){
return PACKET_SEQUENCE_ERROR;
}
#endif
+ //3) check for out of order timestamps
+ if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){
+ return PACKET_TIMESTAMP_ERROR;
+ }
+
//4) otherwise the packet is normal!
return PACKET_IF_DATA;
}
@@ -414,15 +361,15 @@ private:
if (not info.alignment_time_valid or info[index].time > info.alignment_time){
info.alignment_time_valid = true;
info.alignment_time = info[index].time;
- info.indexes_to_do.reset(this->size());
- info.indexes_to_do.remove(index);
+ info.indexes_todo.set();
+ info.indexes_todo.reset(index);
info.data_bytes_to_copy = info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t);
}
//if the sequence id matches:
// remove this index from the list and continue
else if (info[index].time == info.alignment_time){
- info.indexes_to_do.remove(index);
+ info.indexes_todo.reset(index);
}
//if the sequence id is older:
@@ -448,10 +395,10 @@ private:
// - Handle the packet type yielded by the receive.
// - Check the timestamps for alignment conditions.
size_t iterations = 0;
- while (not curr_info.indexes_to_do.empty()){
+ while (curr_info.indexes_todo.any()){
//get the index to process for this iteration
- const size_t index = curr_info.indexes_to_do.front();
+ const size_t index = curr_info.indexes_todo.find_first();
packet_type packet;
//receive a single packet from the transport
@@ -502,8 +449,10 @@ private:
curr_info.metadata.start_of_burst = false;
curr_info.metadata.end_of_burst = false;
curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
- if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW) _props[index].handle_overflow();
- UHD_MSG(fastpath) << "O";
+ if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
+ _props[index].handle_overflow();
+ UHD_MSG(fastpath) << "O";
+ }
return;
case PACKET_TIMEOUT_ERROR:
@@ -589,7 +538,7 @@ private:
//reset current buffer info members for reuse
get_curr_buffer_info().fragment_offset_in_samps = 0;
get_curr_buffer_info().alignment_time_valid = false;
- get_curr_buffer_info().indexes_to_do.reset(this->size());
+ get_curr_buffer_info().indexes_todo.set();
//perform receive with alignment logic
get_aligned_buffs(timeout);
@@ -616,7 +565,7 @@ private:
}
//copy-convert the samples from the recv buffer
- _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, 1/32767.);
+ _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, _scale_factor);
//update the rx copy buffer to reflect the bytes copied
buff_info.copy_buff += bytes_to_copy;
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 8ebc264ef..d5d9e6fe3 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -58,6 +58,7 @@ public:
_next_packet_seq(0)
{
this->resize(size);
+ this->set_scale_factor(32767.);
}
//! Resize the number of transport channels
@@ -132,6 +133,11 @@ public:
return boost::mutex::scoped_lock(_mutex);
}
+ //! Set the scale factor used in float conversion
+ void set_scale_factor(const double scale_factor){
+ _scale_factor = scale_factor;
+ }
+
/*******************************************************************
* Send:
* The entry point for the fast-path send calls.
@@ -238,11 +244,12 @@ private:
size_t _max_samples_per_packet;
std::vector<const void *> _zero_buffs;
size_t _next_packet_seq;
+ double _scale_factor;
/*******************************************************************
* Send a single packet:
******************************************************************/
- size_t send_one_packet(
+ UHD_INLINE size_t send_one_packet(
const uhd::device::send_buffs_type &buffs,
const size_t nsamps_per_buff,
vrt::if_packet_info_t &if_packet_info,
@@ -270,7 +277,7 @@ private:
otw_mem += if_packet_info.num_header_words32;
//copy-convert the samples into the send buffer
- _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, 32767.);
+ _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, _scale_factor);
//commit the samples to the zero-copy interface
size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t);