aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
authorMichael West <michael.west@ettus.com>2017-03-29 13:10:32 -0700
committerMartin Braun <martin.braun@ettus.com>2017-04-05 17:26:28 -0700
commite348353c4f5acef6a5ece11e9c336df4c15d65e1 (patch)
treee2b53bb6f3c539fafa159c9fc4a60cd78b4176e8 /host/lib/transport
parentb32055cbde789fffff78a0311e372ba24f6c8f19 (diff)
downloaduhd-e348353c4f5acef6a5ece11e9c336df4c15d65e1.tar.gz
uhd-e348353c4f5acef6a5ece11e9c336df4c15d65e1.tar.bz2
uhd-e348353c4f5acef6a5ece11e9c336df4c15d65e1.zip
Implement worker threads to offload conversion of data and transport I/O
for send() calls. - One worker thread per channel provides for improved scalability
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp211
1 files changed, 170 insertions, 41 deletions
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 0acc8df4b..431cbf216 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -29,10 +29,13 @@
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
+#include <uhd/utils/safe_call.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/thread_time.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
+#include <boost/atomic.hpp>
+#include <boost/make_shared.hpp>
#include <iostream>
#include <vector>
@@ -49,6 +52,9 @@ namespace uhd {
namespace transport {
namespace sph {
+static const size_t MAX_INTERLEAVE = 4;
+static const double GET_BUFF_TIMEOUT = 0.1;
+
/***********************************************************************
* Super send packet handler
*
@@ -68,19 +74,39 @@ public:
* \param size the number of transport channels
*/
send_packet_handler(const size_t size = 1):
- _next_packet_seq(0), _cached_metadata(false)
+ _next_packet_seq(0), _cached_metadata(false)
{
this->set_enable_trailer(true);
this->resize(size);
}
~send_packet_handler(void){
- /* NOP */
+ UHD_SAFE_CALL(
+ for (size_t i = 0; i < _worker_data.size(); i++)
+ {
+ _worker_data[i]->stop = true;
+ }
+ _worker_thread_group.join_all();
+ );
}
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
+
+ // Stop all worker threads
+ for (size_t i = 0; i < _worker_data.size(); i++)
+ {
+ _worker_data[i]->stop = true;
+ }
+ _worker_thread_group.join_all();
+ _worker_threads.resize(size);
+ _worker_data.resize(size);
+ for (size_t i = 0; i < size; i++)
+ {
+ _worker_data[i] = boost::make_shared<worker_thread_data_t>();
+ }
+
_props.resize(size);
static const uint64_t zero = 0;
_zero_buffs.resize(size, &zero);
@@ -145,7 +171,15 @@ public:
* \param get_buff the getter function
*/
void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){
+ if (_worker_threads[xport_chan])
+ {
+ _worker_thread_group.remove_thread(_worker_threads[xport_chan]);
+ _worker_data[xport_chan]->stop = true;
+ _worker_threads[xport_chan]->join();
+ _worker_data[xport_chan]->stop = false;
+ }
_props.at(xport_chan).get_buff = get_buff;
+ _worker_threads[xport_chan] = _worker_thread_group.create_thread(boost::bind(&send_packet_handler::worker, this, xport_chan));
}
//! Set the conversion routine for all channels
@@ -381,63 +415,147 @@ private:
if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(uint32_t);
if_packet_info.packet_count = _next_packet_seq;
- //get a buffer for each channel or timeout
- BOOST_FOREACH(xport_chan_props_type &props, _props){
- if (not props.buff) props.buff = props.get_buff(timeout);
- if (not props.buff) return 0; //timeout
+ // wait for all worker threads to be ready or timeout
+ boost::system_time expiration = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout * 1000));
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ while (not _worker_data[i]->ready)
+ {
+ if (boost::get_system_time() > expiration)
+ {
+ return 0;
+ }
+ }
+ _worker_data[i]->ready = false;
}
- //setup the data to share with converter threads
+ //setup the data to share with worker threads
_convert_nsamps = nsamps_per_buff;
_convert_buffs = &buffs;
_convert_buffer_offset_bytes = buffer_offset_bytes;
_convert_if_packet_info = &if_packet_info;
- //perform N channels of conversion
- for (size_t i = 0; i < this->size(); i++) {
- convert_to_in_buff(i);
+ //start N channels of conversion
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ _worker_data[i]->go = true;
+ }
+
+ //make sure any sleeping worker threads are woken up
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ // Acquiring the lock used by the condition variable
+ // takes too long, so do a spin wait. If the go flag
+ // is not cleared by this point, it will be cleared
+ // immediately by the worker thread when it wakes up.
+ while (_worker_data[i]->go)
+ {
+ _worker_data[i]->data_ready.notify_one();
+ }
+ }
+
+ //wait for all worker threads to be done
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ //TODO: Implement a better wait strategy
+ //busy loop give fastest response, but these are just wasted cycles
+ while (not _worker_data[i]->done) {}
+ _worker_data[i]->done = false;
}
_next_packet_seq++; //increment sequence after commits
return nsamps_per_buff;
}
- /*! Run the conversion from the internal buffers to the user's input
- * buffer.
+ /*! Worker thread routine.
*
+ * - Gets an internal data buffer
* - Calls the converter
* - Releases internal data buffers
- * - Updates read/write pointers
*/
- UHD_INLINE void convert_to_in_buff(const size_t index)
+ void worker(const size_t index)
{
- //shortcut references to local data structures
- managed_send_buffer::sptr &buff = _props[index].buff;
- vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info;
- const tx_streamer::buffs_type &buffs = *_convert_buffs;
-
- //fill IO buffs with pointers into the output buffer
- const void *io_buffs[4/*max interleave*/];
- for (size_t i = 0; i < _num_inputs; i++){
- const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]);
- io_buffs[i] = b + _convert_buffer_offset_bytes;
+ //maximum number of cycles to spin before waiting on condition variable
+ //the value of 30000000 was derived from 15ms on a 10 GHz CPU divided by 5 cycles per loop
+ //the assumption is that anything held up for 15ms can wait
+ static const size_t MAX_SPIN_CYCLES = 30000000;
+
+ //maximum amount of time to wait before checking the stop flag
+ static const double MAX_WAIT = 0.1;
+
+ managed_send_buffer::sptr buff;
+ vrt::if_packet_info_t if_packet_info;
+ std::vector<const void *> in_buffs(MAX_INTERLEAVE);
+ boost::shared_ptr<worker_thread_data_t> worker_data = _worker_data[index];
+ boost::unique_lock<boost::mutex> lock(worker_data->data_ready_lock);
+ size_t spins = 0;
+
+ while (not worker_data->stop)
+ {
+ if (not buff)
+ {
+ buff = _props[index].get_buff(MAX_WAIT);
+ if (not buff)
+ {
+ continue;
+ }
+ worker_data->ready = true;
+ }
+
+ //make sure done flag is cleared by controlling thread before waiting on go signal
+ if (worker_data->done)
+ {
+ continue;
+ }
+
+ //partial spin lock before wait
+ while (not worker_data->go and spins < MAX_SPIN_CYCLES)
+ {
+ spins++;
+ }
+ if (not worker_data->go and
+ not worker_data->data_ready.timed_wait(lock, boost::posix_time::milliseconds(long(MAX_WAIT*1000))))
+ {
+ continue;
+ }
+ // Clear the go flag immediately to let the
+ // controlling thread know we are not sleeping.
+ worker_data->go = false;
+
+ //reset the spin count
+ spins = 0;
+
+ //pack metadata into a vrt header
+ uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32;
+ if_packet_info = *_convert_if_packet_info;
+ if_packet_info.has_sid = _props[index].has_sid;
+ if_packet_info.sid = _props[index].sid;
+ _vrt_packer(otw_mem, if_packet_info);
+ otw_mem += if_packet_info.num_header_words32;
+
+ //prepare the input buffers
+ for (size_t i = 0; i < _num_inputs; i++)
+ {
+ in_buffs[i] =
+ (reinterpret_cast<const char *>((*_convert_buffs)[index*_num_inputs + i]))
+ + _convert_buffer_offset_bytes;
+ }
+
+ //perform the conversion operation
+ _converter->conv(in_buffs, otw_mem, _convert_nsamps);
+
+ //let the master know that new data can be prepared
+ _worker_data[index]->done = true;
+
+ //commit the samples to the zero-copy interface
+ buff->commit(
+ (_header_offset_words32 + if_packet_info.num_packet_words32)
+ * sizeof(uint32_t)
+ );
+
+ //release the buffer
+ buff.reset();
}
- const ref_vector<const void *> in_buffs(io_buffs, _num_inputs);
-
- //pack metadata into a vrt header
- uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32;
- if_packet_info.has_sid = _props[index].has_sid;
- if_packet_info.sid = _props[index].sid;
- _vrt_packer(otw_mem, if_packet_info);
- otw_mem += if_packet_info.num_header_words32;
-
- //perform the conversion operation
- _converter->conv(in_buffs, otw_mem, _convert_nsamps);
-
- //commit the samples to the zero-copy interface
- const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
- buff->commit(num_vita_words32*sizeof(uint32_t));
- buff.reset(); //effectively a release
}
//! Shared variables for the worker threads
@@ -445,7 +563,18 @@ private:
const tx_streamer::buffs_type *_convert_buffs;
size_t _convert_buffer_offset_bytes;
vrt::if_packet_info_t *_convert_if_packet_info;
-
+ struct worker_thread_data_t {
+ worker_thread_data_t() : ready(false), go(false), done(false), stop(false) {}
+ boost::atomic_bool ready;
+ boost::atomic_bool go;
+ boost::atomic_bool done;
+ boost::atomic_bool stop;
+ boost::mutex data_ready_lock;
+ boost::condition_variable data_ready;
+ };
+ std::vector< boost::shared_ptr<worker_thread_data_t> > _worker_data;
+ boost::thread_group _worker_thread_group;
+ std::vector<boost::thread *> _worker_threads;
};
class send_packet_streamer : public send_packet_handler, public tx_streamer{