summaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/simple_claimer.hpp64
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp93
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp102
-rw-r--r--host/lib/transport/udp_zero_copy.cpp2
-rw-r--r--host/lib/transport/usb_zero_copy_wrapper.cpp3
5 files changed, 151 insertions, 113 deletions
diff --git a/host/lib/transport/simple_claimer.hpp b/host/lib/transport/simple_claimer.hpp
deleted file mode 100644
index 3bbc49a05..000000000
--- a/host/lib/transport/simple_claimer.hpp
+++ /dev/null
@@ -1,64 +0,0 @@
-//
-// Copyright 2012 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#ifndef INCLUDED_LIBUHD_TRANSPORT_SIMPLE_CLAIMER_HPP
-#define INCLUDED_LIBUHD_TRANSPORT_SIMPLE_CLAIMER_HPP
-
-#include <uhd/config.hpp>
-#include <boost/thread/condition.hpp>
-#include <boost/thread/mutex.hpp>
-
-namespace uhd{ namespace transport{
-
-/***********************************************************************
- * Claimer class to provide synchronization for multi-thread access.
- * Claiming enables buffer classes to be used with a buffer queue.
- **********************************************************************/
-class simple_claimer{
-public:
- simple_claimer(void){
- this->release();
- }
-
- UHD_INLINE void release(void){
- boost::mutex::scoped_lock lock(_mutex);
- _locked = false;
- lock.unlock();
- _cond.notify_one();
- }
-
- UHD_INLINE bool claim_with_wait(const double timeout){
- boost::mutex::scoped_lock lock(_mutex);
- while (_locked){
- if (not _cond.timed_wait(lock, boost::posix_time::microseconds(long(timeout*1e6)))){
- break;
- }
- }
- const bool ret = not _locked;
- _locked = true;
- return ret;
- }
-
-private:
- bool _locked;
- boost::mutex _mutex;
- boost::condition_variable _cond;
-};
-
-}} //namespace uhd::transport
-
-#endif /* INCLUDED_LIBUHD_TRANSPORT_SIMPLE_CLAIMER_HPP */
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 205c7a3a3..4b96199e2 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -23,6 +23,8 @@
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
@@ -31,6 +33,9 @@
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/format.hpp>
+#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/barrier.hpp>
#include <iostream>
#include <vector>
@@ -73,12 +78,23 @@ public:
set_alignment_failure_threshold(1000);
}
+ ~recv_packet_handler(void){
+ _task_handlers.clear();
+ }
+
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
+ _task_handlers.clear();
_props.resize(size);
//re-initialize all buffers infos by re-creating the vector
_buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
+ _task_barrier_entry.resize(size);
+ _task_barrier_exit.resize(size);
+ _task_handlers.resize(size);
+ for (size_t i = 1/*skip 0*/; i < size; i++){
+ _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
+ };
}
//! Get the channel width of this handler
@@ -125,7 +141,7 @@ public:
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
- _io_buffs.resize(id.num_outputs);
+ _num_outputs = id.num_outputs;
_converter = uhd::convert::get_converter(id)();
this->set_scale_factor(1/32767.); //update after setting converter
_bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format);
@@ -207,7 +223,7 @@ private:
handle_overflow_type handle_overflow;
};
std::vector<xport_chan_props_type> _props;
- std::vector<void *> _io_buffs; //used in conversion
+ size_t _num_outputs;
size_t _bytes_per_otw_item; //used in conversion
size_t _bytes_per_cpu_item; //used in conversion
uhd::convert::converter::sptr _converter; //used in conversion
@@ -512,24 +528,19 @@ private:
//extract the number of samples available to copy
const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item;
- const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available);
+ const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available);
const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item;
- const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size();
+ const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs;
- size_t buff_index = 0;
- BOOST_FOREACH(per_buffer_info_type &buff_info, info){
+ //setup the data to share with converter threads
+ _convert_nsamps = nsamps_to_copy_per_io_buff;
+ _convert_buffs = &buffs;
+ _convert_buffer_offset_bytes = buffer_offset_bytes;
+ _convert_bytes_to_copy = bytes_to_copy;
- //fill a vector with pointers to the io buffers
- BOOST_FOREACH(void *&io_buff, _io_buffs){
- io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes;
- }
+ //perform N channels of conversion
+ converter_thread_task(0);
- //copy-convert the samples from the recv buffer
- _converter->conv(buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff);
-
- //update the rx copy buffer to reflect the bytes copied
- buff_info.copy_buff += bytes_to_copy;
- }
//update the copy buffer's availability
info.data_bytes_to_copy -= bytes_to_copy;
@@ -538,15 +549,53 @@ private:
metadata.fragment_offset = info.fragment_offset_in_samps;
info.fragment_offset_in_samps += nsamps_to_copy; //set for next call
- //done with buffers? this action releases buffers in-order
- if (not metadata.more_fragments){
- BOOST_FOREACH(per_buffer_info_type &buff_info, info){
- buff_info.buff.reset(); //effectively a release
- }
+ return nsamps_to_copy_per_io_buff;
+ }
+
+ /*******************************************************************
+ * Perform one thread's work of the conversion task.
+ * The entry and exit use a dual synchronization barrier,
+ * to wait for data to become ready and block until completion.
+ ******************************************************************/
+ UHD_INLINE void converter_thread_task(const size_t index)
+ {
+ _task_barrier_entry.wait();
+
+ //shortcut references to local data structures
+ buffers_info_type &buff_info = get_curr_buffer_info();
+ per_buffer_info_type &info = buff_info[index];
+ const rx_streamer::buffs_type &buffs = *_convert_buffs;
+
+ //fill IO buffs with pointers into the output buffer
+ void *io_buffs[4/*max interleave*/];
+ for (size_t i = 0; i < _num_outputs; i++){
+ char *b = reinterpret_cast<char *>(buffs[index*_num_outputs + i]);
+ io_buffs[i] = b + _convert_buffer_offset_bytes;
}
+ const ref_vector<void *> out_buffs(io_buffs, _num_outputs);
- return nsamps_to_copy_per_io_buff;
+ //perform the conversion operation
+ _converter->conv(info.copy_buff, out_buffs, _convert_nsamps);
+
+ //advance the pointer for the source buffer
+ info.copy_buff += _convert_bytes_to_copy;
+
+ //release the buffer if fully consumed
+ if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){
+ info.buff.reset(); //effectively a release
+ }
+
+ _task_barrier_exit.wait();
}
+
+ //! Shared variables for the worker threads
+ reusable_barrier _task_barrier_entry, _task_barrier_exit;
+ std::vector<task::sptr> _task_handlers;
+ size_t _convert_nsamps;
+ const rx_streamer::buffs_type *_convert_buffs;
+ size_t _convert_buffer_offset_bytes;
+ size_t _convert_bytes_to_copy;
+
};
class recv_packet_streamer : public recv_packet_handler, public rx_streamer{
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 02cfad80f..8f943effb 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -23,6 +23,8 @@
#include <uhd/convert.hpp>
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
@@ -58,12 +60,23 @@ public:
this->resize(size);
}
+ ~send_packet_handler(void){
+ _task_handlers.clear();
+ }
+
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
+ _task_handlers.clear();
_props.resize(size);
static const boost::uint64_t zero = 0;
_zero_buffs.resize(size, &zero);
+ _task_barrier_entry.resize(size);
+ _task_barrier_exit.resize(size);
+ _task_handlers.resize(size);
+ for (size_t i = 1/*skip 0*/; i < size; i++){
+ _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i));
+ };
}
//! Get the channel width of this handler
@@ -104,7 +117,7 @@ public:
//! Set the conversion routine for all channels
void set_converter(const uhd::convert::id_type &id){
- _io_buffs.resize(id.num_inputs);
+ _num_inputs = id.num_inputs;
_converter = uhd::convert::get_converter(id)();
this->set_scale_factor(32767.); //update after setting converter
_bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.output_format);
@@ -205,9 +218,10 @@ private:
get_buff_type get_buff;
bool has_sid;
boost::uint32_t sid;
+ managed_send_buffer::sptr buff;
};
std::vector<xport_chan_props_type> _props;
- std::vector<const void *> _io_buffs; //used in conversion
+ size_t _num_inputs;
size_t _bytes_per_otw_item; //used in conversion
size_t _bytes_per_cpu_item; //used in conversion
uhd::convert::converter::sptr _converter; //used in conversion
@@ -226,39 +240,77 @@ private:
const size_t buffer_offset_bytes = 0
){
//load the rest of the if_packet_info in here
- if_packet_info.num_payload_bytes = nsamps_per_buff*_io_buffs.size()*_bytes_per_otw_item;
+ if_packet_info.num_payload_bytes = nsamps_per_buff*_num_inputs*_bytes_per_otw_item;
if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(boost::uint32_t);
if_packet_info.packet_count = _next_packet_seq;
- size_t buff_index = 0;
+ //get a buffer for each channel or timeout
BOOST_FOREACH(xport_chan_props_type &props, _props){
- managed_send_buffer::sptr buff = props.get_buff(timeout);
- if (buff.get() == NULL) return 0; //timeout
-
- //fill a vector with pointers to the io buffers
- BOOST_FOREACH(const void *&io_buff, _io_buffs){
- io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes;
- }
- boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32;
-
- //pack metadata into a vrt header
- if_packet_info.has_sid = props.has_sid;
- if_packet_info.sid = props.sid;
- _vrt_packer(otw_mem, if_packet_info);
- otw_mem += if_packet_info.num_header_words32;
+ if (not props.buff) props.buff = props.get_buff(timeout);
+ if (not props.buff) return 0; //timeout
+ }
- //copy-convert the samples into the send buffer
- _converter->conv(_io_buffs, otw_mem, nsamps_per_buff);
+ //setup the data to share with converter threads
+ _convert_nsamps = nsamps_per_buff;
+ _convert_buffs = &buffs;
+ _convert_buffer_offset_bytes = buffer_offset_bytes;
+ _convert_if_packet_info = &if_packet_info;
- //commit the samples to the zero-copy interface
- size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t);
- buff->commit(num_bytes_total);
- buff.reset(); //effectively a release
+ //perform N channels of conversion
+ converter_thread_task(0);
- }
_next_packet_seq++; //increment sequence after commits
return nsamps_per_buff;
}
+
+ /*******************************************************************
+ * Perform one thread's work of the conversion task.
+ * The entry and exit use a dual synchronization barrier,
+ * to wait for data to become ready and block until completion.
+ ******************************************************************/
+ UHD_INLINE void converter_thread_task(const size_t index)
+ {
+ _task_barrier_entry.wait();
+
+ //shortcut references to local data structures
+ managed_send_buffer::sptr &buff = _props[index].buff;
+ vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info;
+ const tx_streamer::buffs_type &buffs = *_convert_buffs;
+
+ //fill IO buffs with pointers into the output buffer
+ const void *io_buffs[4/*max interleave*/];
+ for (size_t i = 0; i < _num_inputs; i++){
+ const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]);
+ io_buffs[i] = b + _convert_buffer_offset_bytes;
+ }
+ const ref_vector<const void *> in_buffs(io_buffs, _num_inputs);
+
+ //pack metadata into a vrt header
+ boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32;
+ if_packet_info.has_sid = _props[index].has_sid;
+ if_packet_info.sid = _props[index].sid;
+ _vrt_packer(otw_mem, if_packet_info);
+ otw_mem += if_packet_info.num_header_words32;
+
+ //perform the conversion operation
+ _converter->conv(in_buffs, otw_mem, _convert_nsamps);
+
+ //commit the samples to the zero-copy interface
+ const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
+ buff->commit(num_vita_words32*sizeof(boost::uint32_t));
+ buff.reset(); //effectively a release
+
+ _task_barrier_exit.wait();
+ }
+
+ //! Shared variables for the worker threads
+ reusable_barrier _task_barrier_entry, _task_barrier_exit;
+ std::vector<task::sptr> _task_handlers;
+ size_t _convert_nsamps;
+ const tx_streamer::buffs_type *_convert_buffs;
+ size_t _convert_buffer_offset_bytes;
+ vrt::if_packet_info_t *_convert_if_packet_info;
+
};
class send_packet_streamer : public send_packet_handler, public tx_streamer{
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp
index 9765c19c0..9125be53a 100644
--- a/host/lib/transport/udp_zero_copy.cpp
+++ b/host/lib/transport/udp_zero_copy.cpp
@@ -16,12 +16,12 @@
//
#include "udp_common.hpp"
-#include "simple_claimer.hpp"
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/transport/udp_simple.hpp> //mtu
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/utils/atomic.hpp>
#include <boost/format.hpp>
#include <boost/make_shared.hpp>
#include <vector>
diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp
index d59ea36ff..d04244ca9 100644
--- a/host/lib/transport/usb_zero_copy_wrapper.cpp
+++ b/host/lib/transport/usb_zero_copy_wrapper.cpp
@@ -15,12 +15,12 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
-#include "simple_claimer.hpp"
#include <uhd/transport/usb_zero_copy.hpp>
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
#include <boost/foreach.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread/mutex.hpp>
@@ -29,6 +29,7 @@
#include <vector>
#include <iostream>
+using namespace uhd;
using namespace uhd::transport;
static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1));