diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 211 |
1 files changed, 170 insertions, 41 deletions
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 0acc8df4b..431cbf216 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -29,10 +29,13 @@ #include <uhd/types/metadata.hpp> #include <uhd/transport/vrt_if_packet.hpp> #include <uhd/transport/zero_copy.hpp> +#include <uhd/utils/safe_call.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/thread_time.hpp> #include <boost/foreach.hpp> #include <boost/function.hpp> +#include <boost/atomic.hpp> +#include <boost/make_shared.hpp> #include <iostream> #include <vector> @@ -49,6 +52,9 @@ namespace uhd { namespace transport { namespace sph { +static const size_t MAX_INTERLEAVE = 4; +static const double GET_BUFF_TIMEOUT = 0.1; + /*********************************************************************** * Super send packet handler * @@ -68,19 +74,39 @@ public: * \param size the number of transport channels */ send_packet_handler(const size_t size = 1): - _next_packet_seq(0), _cached_metadata(false) + _next_packet_seq(0), _cached_metadata(false) { this->set_enable_trailer(true); this->resize(size); } ~send_packet_handler(void){ - /* NOP */ + UHD_SAFE_CALL( + for (size_t i = 0; i < _worker_data.size(); i++) + { + _worker_data[i]->stop = true; + } + _worker_thread_group.join_all(); + ); } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; + + // Stop all worker threads + for (size_t i = 0; i < _worker_data.size(); i++) + { + _worker_data[i]->stop = true; + } + _worker_thread_group.join_all(); + _worker_threads.resize(size); + _worker_data.resize(size); + for (size_t i = 0; i < size; i++) + { + _worker_data[i] = boost::make_shared<worker_thread_data_t>(); + } + _props.resize(size); static const uint64_t zero = 0; _zero_buffs.resize(size, &zero); @@ -145,7 +171,15 @@ public: * \param get_buff the getter function */ void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + if (_worker_threads[xport_chan]) + { + _worker_thread_group.remove_thread(_worker_threads[xport_chan]); + _worker_data[xport_chan]->stop = true; + _worker_threads[xport_chan]->join(); + _worker_data[xport_chan]->stop = false; + } _props.at(xport_chan).get_buff = get_buff; + _worker_threads[xport_chan] = _worker_thread_group.create_thread(boost::bind(&send_packet_handler::worker, this, xport_chan)); } //! Set the conversion routine for all channels @@ -381,63 +415,147 @@ private: if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(uint32_t); if_packet_info.packet_count = _next_packet_seq; - //get a buffer for each channel or timeout - BOOST_FOREACH(xport_chan_props_type &props, _props){ - if (not props.buff) props.buff = props.get_buff(timeout); - if (not props.buff) return 0; //timeout + // wait for all worker threads to be ready or timeout + boost::system_time expiration = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout * 1000)); + for (size_t i = 0; i < this->size(); i++) + { + while (not _worker_data[i]->ready) + { + if (boost::get_system_time() > expiration) + { + return 0; + } + } + _worker_data[i]->ready = false; } - //setup the data to share with converter threads + //setup the data to share with worker threads _convert_nsamps = nsamps_per_buff; _convert_buffs = &buffs; _convert_buffer_offset_bytes = buffer_offset_bytes; _convert_if_packet_info = &if_packet_info; - //perform N channels of conversion - for (size_t i = 0; i < this->size(); i++) { - convert_to_in_buff(i); + //start N channels of conversion + for (size_t i = 0; i < this->size(); i++) + { + _worker_data[i]->go = true; + } + + //make sure any sleeping worker threads are woken up + for (size_t i = 0; i < this->size(); i++) + { + // Acquiring the lock used by the condition variable + // takes too long, so do a spin wait. If the go flag + // is not cleared by this point, it will be cleared + // immediately by the worker thread when it wakes up. + while (_worker_data[i]->go) + { + _worker_data[i]->data_ready.notify_one(); + } + } + + //wait for all worker threads to be done + for (size_t i = 0; i < this->size(); i++) + { + //TODO: Implement a better wait strategy + //busy loop give fastest response, but these are just wasted cycles + while (not _worker_data[i]->done) {} + _worker_data[i]->done = false; } _next_packet_seq++; //increment sequence after commits return nsamps_per_buff; } - /*! Run the conversion from the internal buffers to the user's input - * buffer. + /*! Worker thread routine. * + * - Gets an internal data buffer * - Calls the converter * - Releases internal data buffers - * - Updates read/write pointers */ - UHD_INLINE void convert_to_in_buff(const size_t index) + void worker(const size_t index) { - //shortcut references to local data structures - managed_send_buffer::sptr &buff = _props[index].buff; - vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; - const tx_streamer::buffs_type &buffs = *_convert_buffs; - - //fill IO buffs with pointers into the output buffer - const void *io_buffs[4/*max interleave*/]; - for (size_t i = 0; i < _num_inputs; i++){ - const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]); - io_buffs[i] = b + _convert_buffer_offset_bytes; + //maximum number of cycles to spin before waiting on condition variable + //the value of 30000000 was derived from 15ms on a 10 GHz CPU divided by 5 cycles per loop + //the assumption is that anything held up for 15ms can wait + static const size_t MAX_SPIN_CYCLES = 30000000; + + //maximum amount of time to wait before checking the stop flag + static const double MAX_WAIT = 0.1; + + managed_send_buffer::sptr buff; + vrt::if_packet_info_t if_packet_info; + std::vector<const void *> in_buffs(MAX_INTERLEAVE); + boost::shared_ptr<worker_thread_data_t> worker_data = _worker_data[index]; + boost::unique_lock<boost::mutex> lock(worker_data->data_ready_lock); + size_t spins = 0; + + while (not worker_data->stop) + { + if (not buff) + { + buff = _props[index].get_buff(MAX_WAIT); + if (not buff) + { + continue; + } + worker_data->ready = true; + } + + //make sure done flag is cleared by controlling thread before waiting on go signal + if (worker_data->done) + { + continue; + } + + //partial spin lock before wait + while (not worker_data->go and spins < MAX_SPIN_CYCLES) + { + spins++; + } + if (not worker_data->go and + not worker_data->data_ready.timed_wait(lock, boost::posix_time::milliseconds(long(MAX_WAIT*1000)))) + { + continue; + } + // Clear the go flag immediately to let the + // controlling thread know we are not sleeping. + worker_data->go = false; + + //reset the spin count + spins = 0; + + //pack metadata into a vrt header + uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32; + if_packet_info = *_convert_if_packet_info; + if_packet_info.has_sid = _props[index].has_sid; + if_packet_info.sid = _props[index].sid; + _vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //prepare the input buffers + for (size_t i = 0; i < _num_inputs; i++) + { + in_buffs[i] = + (reinterpret_cast<const char *>((*_convert_buffs)[index*_num_inputs + i])) + + _convert_buffer_offset_bytes; + } + + //perform the conversion operation + _converter->conv(in_buffs, otw_mem, _convert_nsamps); + + //let the master know that new data can be prepared + _worker_data[index]->done = true; + + //commit the samples to the zero-copy interface + buff->commit( + (_header_offset_words32 + if_packet_info.num_packet_words32) + * sizeof(uint32_t) + ); + + //release the buffer + buff.reset(); } - const ref_vector<const void *> in_buffs(io_buffs, _num_inputs); - - //pack metadata into a vrt header - uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32; - if_packet_info.has_sid = _props[index].has_sid; - if_packet_info.sid = _props[index].sid; - _vrt_packer(otw_mem, if_packet_info); - otw_mem += if_packet_info.num_header_words32; - - //perform the conversion operation - _converter->conv(in_buffs, otw_mem, _convert_nsamps); - - //commit the samples to the zero-copy interface - const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; - buff->commit(num_vita_words32*sizeof(uint32_t)); - buff.reset(); //effectively a release } //! Shared variables for the worker threads @@ -445,7 +563,18 @@ private: const tx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; vrt::if_packet_info_t *_convert_if_packet_info; - + struct worker_thread_data_t { + worker_thread_data_t() : ready(false), go(false), done(false), stop(false) {} + boost::atomic_bool ready; + boost::atomic_bool go; + boost::atomic_bool done; + boost::atomic_bool stop; + boost::mutex data_ready_lock; + boost::condition_variable data_ready; + }; + std::vector< boost::shared_ptr<worker_thread_data_t> > _worker_data; + boost::thread_group _worker_thread_group; + std::vector<boost::thread *> _worker_threads; }; class send_packet_streamer : public send_packet_handler, public tx_streamer{ |