From e46bfc091720aacacb199cf79222a76abdb94c21 Mon Sep 17 00:00:00 2001 From: Michael West Date: Wed, 19 Apr 2017 14:03:53 -0700 Subject: Revert commit e348353c4f5acef6a5ece11e9c336df4c15d65e1. Worker threads significantly increased CPU load and did not increase performance as expected. --- host/lib/transport/super_send_packet_handler.hpp | 211 +++++------------------ 1 file changed, 41 insertions(+), 170 deletions(-) (limited to 'host/lib') diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 431cbf216..0acc8df4b 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -29,13 +29,10 @@ #include #include #include -#include #include #include #include #include -#include -#include #include #include @@ -52,9 +49,6 @@ namespace uhd { namespace transport { namespace sph { -static const size_t MAX_INTERLEAVE = 4; -static const double GET_BUFF_TIMEOUT = 0.1; - /*********************************************************************** * Super send packet handler * @@ -74,39 +68,19 @@ public: * \param size the number of transport channels */ send_packet_handler(const size_t size = 1): - _next_packet_seq(0), _cached_metadata(false) + _next_packet_seq(0), _cached_metadata(false) { this->set_enable_trailer(true); this->resize(size); } ~send_packet_handler(void){ - UHD_SAFE_CALL( - for (size_t i = 0; i < _worker_data.size(); i++) - { - _worker_data[i]->stop = true; - } - _worker_thread_group.join_all(); - ); + /* NOP */ } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; - - // Stop all worker threads - for (size_t i = 0; i < _worker_data.size(); i++) - { - _worker_data[i]->stop = true; - } - _worker_thread_group.join_all(); - _worker_threads.resize(size); - _worker_data.resize(size); - for (size_t i = 0; i < size; i++) - { - _worker_data[i] = boost::make_shared(); - } - _props.resize(size); static const uint64_t zero = 0; _zero_buffs.resize(size, &zero); @@ -171,15 +145,7 @@ public: * \param get_buff the getter function */ void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ - if (_worker_threads[xport_chan]) - { - _worker_thread_group.remove_thread(_worker_threads[xport_chan]); - _worker_data[xport_chan]->stop = true; - _worker_threads[xport_chan]->join(); - _worker_data[xport_chan]->stop = false; - } _props.at(xport_chan).get_buff = get_buff; - _worker_threads[xport_chan] = _worker_thread_group.create_thread(boost::bind(&send_packet_handler::worker, this, xport_chan)); } //! Set the conversion routine for all channels @@ -415,147 +381,63 @@ private: if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(uint32_t); if_packet_info.packet_count = _next_packet_seq; - // wait for all worker threads to be ready or timeout - boost::system_time expiration = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout * 1000)); - for (size_t i = 0; i < this->size(); i++) - { - while (not _worker_data[i]->ready) - { - if (boost::get_system_time() > expiration) - { - return 0; - } - } - _worker_data[i]->ready = false; + //get a buffer for each channel or timeout + BOOST_FOREACH(xport_chan_props_type &props, _props){ + if (not props.buff) props.buff = props.get_buff(timeout); + if (not props.buff) return 0; //timeout } - //setup the data to share with worker threads + //setup the data to share with converter threads _convert_nsamps = nsamps_per_buff; _convert_buffs = &buffs; _convert_buffer_offset_bytes = buffer_offset_bytes; _convert_if_packet_info = &if_packet_info; - //start N channels of conversion - for (size_t i = 0; i < this->size(); i++) - { - _worker_data[i]->go = true; - } - - //make sure any sleeping worker threads are woken up - for (size_t i = 0; i < this->size(); i++) - { - // Acquiring the lock used by the condition variable - // takes too long, so do a spin wait. If the go flag - // is not cleared by this point, it will be cleared - // immediately by the worker thread when it wakes up. - while (_worker_data[i]->go) - { - _worker_data[i]->data_ready.notify_one(); - } - } - - //wait for all worker threads to be done - for (size_t i = 0; i < this->size(); i++) - { - //TODO: Implement a better wait strategy - //busy loop give fastest response, but these are just wasted cycles - while (not _worker_data[i]->done) {} - _worker_data[i]->done = false; + //perform N channels of conversion + for (size_t i = 0; i < this->size(); i++) { + convert_to_in_buff(i); } _next_packet_seq++; //increment sequence after commits return nsamps_per_buff; } - /*! Worker thread routine. + /*! Run the conversion from the internal buffers to the user's input + * buffer. * - * - Gets an internal data buffer * - Calls the converter * - Releases internal data buffers + * - Updates read/write pointers */ - void worker(const size_t index) + UHD_INLINE void convert_to_in_buff(const size_t index) { - //maximum number of cycles to spin before waiting on condition variable - //the value of 30000000 was derived from 15ms on a 10 GHz CPU divided by 5 cycles per loop - //the assumption is that anything held up for 15ms can wait - static const size_t MAX_SPIN_CYCLES = 30000000; - - //maximum amount of time to wait before checking the stop flag - static const double MAX_WAIT = 0.1; - - managed_send_buffer::sptr buff; - vrt::if_packet_info_t if_packet_info; - std::vector in_buffs(MAX_INTERLEAVE); - boost::shared_ptr worker_data = _worker_data[index]; - boost::unique_lock lock(worker_data->data_ready_lock); - size_t spins = 0; - - while (not worker_data->stop) - { - if (not buff) - { - buff = _props[index].get_buff(MAX_WAIT); - if (not buff) - { - continue; - } - worker_data->ready = true; - } - - //make sure done flag is cleared by controlling thread before waiting on go signal - if (worker_data->done) - { - continue; - } - - //partial spin lock before wait - while (not worker_data->go and spins < MAX_SPIN_CYCLES) - { - spins++; - } - if (not worker_data->go and - not worker_data->data_ready.timed_wait(lock, boost::posix_time::milliseconds(long(MAX_WAIT*1000)))) - { - continue; - } - // Clear the go flag immediately to let the - // controlling thread know we are not sleeping. - worker_data->go = false; - - //reset the spin count - spins = 0; - - //pack metadata into a vrt header - uint32_t *otw_mem = buff->cast() + _header_offset_words32; - if_packet_info = *_convert_if_packet_info; - if_packet_info.has_sid = _props[index].has_sid; - if_packet_info.sid = _props[index].sid; - _vrt_packer(otw_mem, if_packet_info); - otw_mem += if_packet_info.num_header_words32; - - //prepare the input buffers - for (size_t i = 0; i < _num_inputs; i++) - { - in_buffs[i] = - (reinterpret_cast((*_convert_buffs)[index*_num_inputs + i])) - + _convert_buffer_offset_bytes; - } - - //perform the conversion operation - _converter->conv(in_buffs, otw_mem, _convert_nsamps); - - //let the master know that new data can be prepared - _worker_data[index]->done = true; - - //commit the samples to the zero-copy interface - buff->commit( - (_header_offset_words32 + if_packet_info.num_packet_words32) - * sizeof(uint32_t) - ); - - //release the buffer - buff.reset(); + //shortcut references to local data structures + managed_send_buffer::sptr &buff = _props[index].buff; + vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; + const tx_streamer::buffs_type &buffs = *_convert_buffs; + + //fill IO buffs with pointers into the output buffer + const void *io_buffs[4/*max interleave*/]; + for (size_t i = 0; i < _num_inputs; i++){ + const char *b = reinterpret_cast(buffs[index*_num_inputs + i]); + io_buffs[i] = b + _convert_buffer_offset_bytes; } + const ref_vector in_buffs(io_buffs, _num_inputs); + + //pack metadata into a vrt header + uint32_t *otw_mem = buff->cast() + _header_offset_words32; + if_packet_info.has_sid = _props[index].has_sid; + if_packet_info.sid = _props[index].sid; + _vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //perform the conversion operation + _converter->conv(in_buffs, otw_mem, _convert_nsamps); + + //commit the samples to the zero-copy interface + const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; + buff->commit(num_vita_words32*sizeof(uint32_t)); + buff.reset(); //effectively a release } //! Shared variables for the worker threads @@ -563,18 +445,7 @@ private: const tx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; vrt::if_packet_info_t *_convert_if_packet_info; - struct worker_thread_data_t { - worker_thread_data_t() : ready(false), go(false), done(false), stop(false) {} - boost::atomic_bool ready; - boost::atomic_bool go; - boost::atomic_bool done; - boost::atomic_bool stop; - boost::mutex data_ready_lock; - boost::condition_variable data_ready; - }; - std::vector< boost::shared_ptr > _worker_data; - boost::thread_group _worker_thread_group; - std::vector _worker_threads; + }; class send_packet_streamer : public send_packet_handler, public tx_streamer{ -- cgit v1.2.3 From 86c74ee9b497da42df30870a7943f011151b7a49 Mon Sep 17 00:00:00 2001 From: Michael West Date: Wed, 19 Apr 2017 15:25:45 -0700 Subject: PCIe: Fix runtime page size acquisition and page size alignment checks for nirio_zero_copy transport. --- host/lib/transport/nirio_zero_copy.cpp | 39 +++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) (limited to 'host/lib') diff --git a/host/lib/transport/nirio_zero_copy.cpp b/host/lib/transport/nirio_zero_copy.cpp index 4212fab42..ae32727d7 100644 --- a/host/lib/transport/nirio_zero_copy.cpp +++ b/host/lib/transport/nirio_zero_copy.cpp @@ -32,6 +32,23 @@ //@TODO: Move the register defs required by the class to a common location #include "../usrp/x300/x300_regs.hpp" +#if defined(_WIN32) || defined(__WIN32__) || defined(WIN32) +#include +static UHD_INLINE size_t get_page_size() +{ + SYSTEM_INFO si; + GetSystemInfo(&si); + return si.dwPageSize; +} +#else +#include +static UHD_INLINE size_t get_page_size() +{ + return size_t(sysconf(_SC_PAGESIZE)); +} +#endif +static const size_t page_size = get_page_size(); + using namespace uhd; using namespace uhd::transport; using namespace uhd::niusrprio; @@ -351,7 +368,6 @@ nirio_zero_copy::sptr nirio_zero_copy::make( ){ //Initialize xport_params zero_copy_xport_params xport_params = default_buff_args; - size_t page_size = boost::interprocess::mapped_region::get_page_size(); //The kernel buffer for this transport must be (num_frames * frame_size) big. Unlike ethernet, //where the kernel buffer size is independent of the circular buffer size for the transport, @@ -366,6 +382,22 @@ nirio_zero_copy::sptr nirio_zero_copy::make( size_t usr_recv_buff_size = static_cast( hints.cast("recv_buff_size", default_buff_args.num_recv_frames)); + if (hints.has_key("recv_buff_size")) + { + if (usr_recv_buff_size % page_size != 0) + { + throw uhd::value_error((boost::format("recv_buff_size must be multiple of %d") % page_size).str()); + } + } + + if (hints.has_key("recv_frame_size") and hints.has_key("num_recv_frames")) + { + if (usr_num_recv_frames * xport_params.recv_frame_size % page_size != 0) + { + throw uhd::value_error((boost::format("num_recv_frames * recv_frame_size must be an even multiple of %d") % page_size).str()); + } + } + if (hints.has_key("num_recv_frames") and hints.has_key("recv_buff_size")) { if (usr_recv_buff_size < xport_params.recv_frame_size) throw uhd::value_error("recv_buff_size must be equal to or greater than (num_recv_frames * recv_frame_size)"); @@ -380,6 +412,11 @@ nirio_zero_copy::sptr nirio_zero_copy::make( xport_params.num_recv_frames = usr_num_recv_frames; } + if (xport_params.num_recv_frames * xport_params.recv_frame_size % page_size != 0) + { + throw uhd::value_error((boost::format("num_recv_frames * recv_frame_size must be an even multiple of %d") % page_size).str()); + } + //TX xport_params.send_frame_size = size_t(hints.cast("send_frame_size", default_buff_args.send_frame_size)); -- cgit v1.2.3 From d2a354bbe4b8ecd81f5c3bdb459a9ae822f91e9a Mon Sep 17 00:00:00 2001 From: Andrej Rode Date: Tue, 25 Apr 2017 12:30:33 -0700 Subject: uhd: cast thread_group.create_thread() return value to void to avoid memory leak --- host/examples/network_relay.cpp | 4 ++-- host/lib/utils/tasks.cpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'host/lib') diff --git a/host/examples/network_relay.cpp b/host/examples/network_relay.cpp index b16bcaaa5..7e354934a 100644 --- a/host/examples/network_relay.cpp +++ b/host/examples/network_relay.cpp @@ -100,9 +100,9 @@ public: std::cout << "spawning relay threads... " << _port << std::endl; boost::unique_lock lock(spawn_mutex); // lock in preparation to wait for threads to spawn - _thread_group.create_thread(boost::bind(&udp_relay_type::server_thread, this)); + (void)_thread_group.create_thread(boost::bind(&udp_relay_type::server_thread, this)); wait_for_thread.wait(lock); // wait for thread to spin up - _thread_group.create_thread(boost::bind(&udp_relay_type::client_thread, this)); + (void)_thread_group.create_thread(boost::bind(&udp_relay_type::client_thread, this)); wait_for_thread.wait(lock); // wait for thread to spin up std::cout << " done!" << std::endl << std::endl; } diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp index 1e8f50736..661315ae8 100644 --- a/host/lib/utils/tasks.cpp +++ b/host/lib/utils/tasks.cpp @@ -32,7 +32,7 @@ public: task_impl(const task_fcn_type &task_fcn): _spawn_barrier(2) { - _thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn)); + (void)_thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn)); _spawn_barrier.wait(); } @@ -99,7 +99,7 @@ public: msg_task_impl(const task_fcn_type &task_fcn): _spawn_barrier(2) { - _thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn)); + (void)_thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn)); _spawn_barrier.wait(); } -- cgit v1.2.3 From e6c8cee6e9e6dbe257bc6a77899306e611d44d71 Mon Sep 17 00:00:00 2001 From: Andrej Rode Date: Tue, 25 Apr 2017 12:33:07 -0700 Subject: coverity: fix various minor issues --- host/lib/rfnoc/nocscript/expression.hpp | 2 ++ host/utils/usrp_n2xx_simple_net_burner.cpp | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'host/lib') diff --git a/host/lib/rfnoc/nocscript/expression.hpp b/host/lib/rfnoc/nocscript/expression.hpp index 83fc5bcbc..1acd02009 100644 --- a/host/lib/rfnoc/nocscript/expression.hpp +++ b/host/lib/rfnoc/nocscript/expression.hpp @@ -215,6 +215,7 @@ class expression_container : public expression //! Create an empty container expression_container() : _combiner(COMBINE_NOTSET) {}; + virtual ~expression_container(){}; /*! Type-deduction rules for containers are: * - If the combination type is COMBINE_ALL or COMBINE_AND, @@ -299,6 +300,7 @@ class expression_function : public expression_container const std::string &name, const boost::shared_ptr func_table ); + ~expression_function(){}; //! Add an argument expression virtual void add(expression::sptr new_expr); diff --git a/host/utils/usrp_n2xx_simple_net_burner.cpp b/host/utils/usrp_n2xx_simple_net_burner.cpp index f85ea9def..20070503c 100644 --- a/host/utils/usrp_n2xx_simple_net_burner.cpp +++ b/host/utils/usrp_n2xx_simple_net_burner.cpp @@ -151,7 +151,7 @@ void sig_int_handler(int){ void list_usrps(){ udp_simple::sptr udp_bc_transport; const usrp2_fw_update_data_t *update_data_in = reinterpret_cast(usrp2_update_data_in_mem); - uint32_t hw_rev; + uint32_t hw_rev = 0; usrp2_fw_update_data_t usrp2_ack_pkt = usrp2_fw_update_data_t(); usrp2_ack_pkt.proto_ver = htonx(USRP2_FW_PROTO_VERSION); -- cgit v1.2.3