From 5b07551577d5c596cb690484ad190d8ad8bac643 Mon Sep 17 00:00:00 2001 From: michael-west Date: Wed, 8 Mar 2017 17:17:39 -0800 Subject: Added class to add flow control to any zero_copy_if interface. --- host/lib/transport/CMakeLists.txt | 1 + host/lib/transport/zero_copy_flow_ctrl.cpp | 227 +++++++++++++++++++++++++++++ 2 files changed, 228 insertions(+) create mode 100644 host/lib/transport/zero_copy_flow_ctrl.cpp (limited to 'host/lib') diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 44c8d59af..a6d84cc4a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -129,6 +129,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp ) IF(ENABLE_X300) diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp new file mode 100644 index 000000000..06d7934e2 --- /dev/null +++ b/host/lib/transport/zero_copy_flow_ctrl.cpp @@ -0,0 +1,227 @@ +// +// Copyright 2017 Ettus Research +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace uhd; +using namespace uhd::transport; + +typedef bounded_buffer bounded_buffer_t; + +class zero_copy_flow_ctrl_msb : public managed_send_buffer +{ +public: + zero_copy_flow_ctrl_msb( + flow_ctrl_func flow_ctrl + ) : + _mb(NULL), + _flow_ctrl(flow_ctrl) + { + /* NOP */ + } + + ~zero_copy_flow_ctrl_msb() + { + /* NOP */ + } + + void release() + { + if (_mb) + { + _mb->commit(size()); + while (_flow_ctrl and not _flow_ctrl(_mb)) {} + _mb.reset(); + } + } + + UHD_INLINE sptr get(sptr &mb) + { + _mb = mb; + return make(this, _mb->cast(), _mb->size()); + } + +private: + sptr _mb; + flow_ctrl_func _flow_ctrl; +}; + +class zero_copy_flow_ctrl_mrb : public managed_recv_buffer +{ +public: + zero_copy_flow_ctrl_mrb( + flow_ctrl_func flow_ctrl + ) : + _mb(NULL), + _flow_ctrl(flow_ctrl) + { + /* NOP */ + } + + ~zero_copy_flow_ctrl_mrb() + { + /* NOP */ + } + + void release() + { + if (_mb) + { + _mb->commit(size()); + while (_flow_ctrl and not _flow_ctrl(_mb)) {} + _mb.reset(); + } + } + + UHD_INLINE sptr get(sptr &mb) + { + _mb = mb; + return make(this, _mb->cast(), _mb->size()); + } + +private: + sptr _mb; + flow_ctrl_func _flow_ctrl; +}; + +/*********************************************************************** + * Zero copy offload transport: + * An intermediate transport that utilizes threading to free + * the main thread from any receive work. + **********************************************************************/ +class zero_copy_flow_ctrl_impl : public zero_copy_flow_ctrl { +public: + typedef boost::shared_ptr sptr; + + zero_copy_flow_ctrl_impl(zero_copy_if::sptr transport, + flow_ctrl_func send_flow_ctrl, + flow_ctrl_func recv_flow_ctrl) : + _transport(transport), + _send_buffers(transport->get_num_send_frames()), + _recv_buffers(transport->get_num_recv_frames()), + _send_buff_index(0), + _recv_buff_index(0), + _send_flow_ctrl(send_flow_ctrl), + _recv_flow_ctrl(recv_flow_ctrl) + { + UHD_LOG << "Created zero_copy_flow_ctrl" << std::endl; + + for (size_t i = 0; i < transport->get_num_send_frames(); i++) + { + _send_buffers[i] = boost::make_shared(_send_flow_ctrl); + } + for (size_t i = 0; i < transport->get_num_recv_frames(); i++) + { + _recv_buffers[i] = boost::make_shared(_recv_flow_ctrl); + } + } + + ~zero_copy_flow_ctrl_impl() + { + } + + /******************************************************************* + * Receive implementation: + * Pop the receive buffer pointer from the underlying transport + ******************************************************************/ + UHD_INLINE managed_recv_buffer::sptr get_recv_buff(double timeout) + { + managed_recv_buffer::sptr ptr; + managed_recv_buffer::sptr buff = _transport->get_recv_buff(timeout); + if (buff) + { + boost::shared_ptr mb = _recv_buffers[_recv_buff_index++]; + _recv_buff_index %= _recv_buffers.size(); + ptr = mb->get(buff); + } + return ptr; + } + + UHD_INLINE size_t get_num_recv_frames() const + { + return _transport->get_num_recv_frames(); + } + + UHD_INLINE size_t get_recv_frame_size() const + { + return _transport->get_recv_frame_size(); + } + + /******************************************************************* + * Send implementation: + * Pass the send buffer pointer from the underlying transport + ******************************************************************/ + managed_send_buffer::sptr get_send_buff(double timeout) + { + managed_send_buffer::sptr ptr; + managed_send_buffer::sptr buff = _transport->get_send_buff(timeout); + if (buff) + { + boost::shared_ptr mb = _send_buffers[_send_buff_index++]; + _send_buff_index %= _send_buffers.size(); + ptr = mb->get(buff); + } + return ptr; + } + + UHD_INLINE size_t get_num_send_frames() const + { + return _transport->get_num_send_frames(); + } + + UHD_INLINE size_t get_send_frame_size() const + { + return _transport->get_send_frame_size(); + } + +private: + // The underlying transport + zero_copy_if::sptr _transport; + + // buffers + std::vector< boost::shared_ptr > _send_buffers; + std::vector< boost::shared_ptr > _recv_buffers; + size_t _send_buff_index; + size_t _recv_buff_index; + + // Flow control functions + flow_ctrl_func _send_flow_ctrl; + flow_ctrl_func _recv_flow_ctrl; +}; + +zero_copy_flow_ctrl::sptr zero_copy_flow_ctrl::make( + zero_copy_if::sptr transport, + flow_ctrl_func send_flow_ctrl, + flow_ctrl_func recv_flow_ctrl +) +{ + zero_copy_flow_ctrl_impl::sptr zero_copy_flow_ctrl( + new zero_copy_flow_ctrl_impl(transport, send_flow_ctrl, recv_flow_ctrl) + ); + + return zero_copy_flow_ctrl; +} -- cgit v1.2.3 From 171f85e4c7c0c1dbe796c5ba42215184ef2d6065 Mon Sep 17 00:00:00 2001 From: michael-west Date: Wed, 8 Mar 2017 17:18:42 -0800 Subject: Device3: Improved send flow control - Implemented zero_copy_flow_ctrl for send transport - Removed bounded buffer for sequence acks - Created spin wait on flow control for fastest response to flow control updates --- host/lib/usrp/device3/device3_io_impl.cpp | 68 ++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 24 deletions(-) (limited to 'host/lib') diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp index c1bfd1606..aa84dcab0 100644 --- a/host/lib/usrp/device3/device3_io_impl.cpp +++ b/host/lib/usrp/device3/device3_io_impl.cpp @@ -31,6 +31,8 @@ #include "../../rfnoc/tx_stream_terminator.hpp" #include #include +#include +#include #define UHD_STREAMER_LOG() UHD_LOGV(never) @@ -304,12 +306,13 @@ struct tx_fc_cache_t device_channel(0), last_seq_out(0), last_seq_ack(0), - seq_queue(1){} + last_seq_ack_cache(0) {} + size_t stream_channel; size_t device_channel; size_t last_seq_out; - size_t last_seq_ack; - uhd::transport::bounded_buffer seq_queue; + boost::atomic_size_t last_seq_ack; + size_t last_seq_ack_cache; boost::shared_ptr async_queue; boost::shared_ptr old_async_queue; }; @@ -337,33 +340,43 @@ static size_t get_tx_flow_control_window( return window_in_pkts; } -static managed_send_buffer::sptr get_tx_buff_with_flowctrl( - task::sptr /*holds ref*/, - boost::shared_ptr fc_cache, +// TODO: Remove this function +// This function only exists to make sure the transport is not destroyed +// until it is no longer needed. +static managed_send_buffer::sptr get_tx_buff( zero_copy_if::sptr xport, - size_t fc_window, const double timeout ){ + return xport->get_send_buff(timeout); +} + +static bool tx_flow_ctrl( + task::sptr /*holds ref*/, + boost::shared_ptr fc_cache, + size_t fc_window, + managed_buffer::sptr +) { + // Busy loop waiting for flow control update. This is necessary because + // at this point there is data trying to be sent and it must be sent as + // quickly as possible when the flow control update arrives to avoid + // underruns at high rates. This is also OK because it only occurs when + // data needs to be sent and flow control is holding it back. while (true) { // delta is the amount of FC credit we've used up - const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - (fc_cache->last_seq_ack & HW_SEQ_NUM_MASK); + const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - + (fc_cache->last_seq_ack_cache & HW_SEQ_NUM_MASK); // If we want to send another packet, we must have FC credit left if ((delta & HW_SEQ_NUM_MASK) < fc_window) - break; - - // If credit is all used up, we check seq_queue for more. - const bool ok = fc_cache->seq_queue.pop_with_timed_wait(fc_cache->last_seq_ack, timeout); - if (not ok) { - return managed_send_buffer::sptr(); //timeout waiting for flow control + { + // Packet will be sent + fc_cache->last_seq_out++; //update seq + return true; } + // update the cached value from the atomic + fc_cache->last_seq_ack_cache = fc_cache->last_seq_ack; } - - managed_send_buffer::sptr buff = xport->get_send_buff(timeout); - if (buff) { - fc_cache->last_seq_out++; //update seq, this will actually be a send - } - return buff; + return false; } #define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0 @@ -380,7 +393,9 @@ static void handle_tx_async_msgs( ) { managed_recv_buffer::sptr buff = xport->get_recv_buff(); if (not buff) + { return; + } //extract packet info vrt::if_packet_info_t if_packet_info; @@ -429,8 +444,7 @@ static void handle_tx_async_msgs( //The FC response and the burst ack are two indicators that the radio //consumed packets. Use them to update the FC metadata if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) { - const size_t seq = metadata.user_payload[0]; - fc_cache->seq_queue.push_with_pop_on_full(seq); + fc_cache->last_seq_ack = metadata.user_payload[0]; } //FC responses don't propagate up to the user so filter them here @@ -841,13 +855,19 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_) } } + // Add flow control + xport.send = zero_copy_flow_ctrl::make( + xport.send, + boost::bind(&tx_flow_ctrl, task, fc_cache, fc_window, _1), + NULL); + //Give the streamer a functor to get the send buffer - //get_tx_buff_with_flowctrl is static so bind has no lifetime issues + //get_tx_buff is static so bind has no lifetime issues //xport.send (sptr) is required to add streamer->data-transport lifetime dependency //task (sptr) is required to add a streamer->async-handler lifetime dependency my_streamer->set_xport_chan_get_buff( stream_i, - boost::bind(&get_tx_buff_with_flowctrl, task, fc_cache, xport.send, fc_window, _1) + boost::bind(&get_tx_buff, xport.send, _1) ); //Give the streamer a functor handled received async messages my_streamer->set_async_receiver( -- cgit v1.2.3 From fe1088f9ce5bc6b2b2054a70aed60f62eb6c4c12 Mon Sep 17 00:00:00 2001 From: Michael West Date: Mon, 27 Mar 2017 19:22:13 -0700 Subject: X300: Change default frame sizes for PCIe to be page aligned for better performance --- host/lib/usrp/x300/x300_impl.cpp | 8 ++++---- host/lib/usrp/x300/x300_impl.hpp | 21 ++++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) (limited to 'host/lib') diff --git a/host/lib/usrp/x300/x300_impl.cpp b/host/lib/usrp/x300/x300_impl.cpp index fe422db46..4f3870357 100644 --- a/host/lib/usrp/x300/x300_impl.cpp +++ b/host/lib/usrp/x300/x300_impl.cpp @@ -1107,14 +1107,14 @@ uhd::both_xports_t x300_impl::make_transport( ? X300_PCIE_RX_DATA_FRAME_SIZE : X300_PCIE_MSG_FRAME_SIZE; - default_buff_args.num_send_frames = - (xport_type == TX_DATA) - ? X300_PCIE_DATA_NUM_FRAMES + default_buff_args.num_send_frames = + (xport_type == TX_DATA) + ? X300_PCIE_TX_DATA_NUM_FRAMES : X300_PCIE_MSG_NUM_FRAMES; default_buff_args.num_recv_frames = (xport_type == RX_DATA) - ? X300_PCIE_DATA_NUM_FRAMES + ? X300_PCIE_RX_DATA_NUM_FRAMES : X300_PCIE_MSG_NUM_FRAMES; xports.recv = nirio_zero_copy::make( diff --git a/host/lib/usrp/x300/x300_impl.hpp b/host/lib/usrp/x300/x300_impl.hpp index 27ea6f40e..14120bd1f 100644 --- a/host/lib/usrp/x300/x300_impl.hpp +++ b/host/lib/usrp/x300/x300_impl.hpp @@ -51,15 +51,18 @@ static const size_t X300_RX_SW_BUFF_SIZE_ETH = 0x2000000;//32MiB For a static const size_t X300_RX_SW_BUFF_SIZE_ETH_MACOS = 0x100000; //1Mib //The FIFO closest to the DMA controller is 1023 elements deep for RX and 1029 elements deep for TX -//where an element is 8 bytes. For best throughput ensure that the data frame fits in these buffers. -//Also ensure that the kernel has enough frames to hold buffered TX and RX data -static const size_t X300_PCIE_RX_DATA_FRAME_SIZE = 8184; //bytes -static const size_t X300_PCIE_TX_DATA_FRAME_SIZE = 8184; //bytes -static const size_t X300_PCIE_DATA_NUM_FRAMES = 2048; -static const size_t X300_PCIE_MSG_FRAME_SIZE = 256; //bytes -static const size_t X300_PCIE_MSG_NUM_FRAMES = 64; -static const size_t X300_PCIE_MAX_CHANNELS = 6; -static const size_t X300_PCIE_MAX_MUXED_XPORTS = 32; +//where an element is 8 bytes. The buffers (number of frames * frame size) must be aligned to the +//memory page size. For the control, we are getting lucky because 64 frames * 256 bytes each aligns +//with the typical page size of 4096 bytes. Since most page sizes are 4096 bytes or some multiple of +//that, keep the number of frames * frame size aligned to it. +static const size_t X300_PCIE_RX_DATA_FRAME_SIZE = 4096; //bytes +static const size_t X300_PCIE_RX_DATA_NUM_FRAMES = 4096; +static const size_t X300_PCIE_TX_DATA_FRAME_SIZE = 4096; //bytes +static const size_t X300_PCIE_TX_DATA_NUM_FRAMES = 4096; +static const size_t X300_PCIE_MSG_FRAME_SIZE = 256; //bytes +static const size_t X300_PCIE_MSG_NUM_FRAMES = 64; +static const size_t X300_PCIE_MAX_CHANNELS = 6; +static const size_t X300_PCIE_MAX_MUXED_XPORTS = 32; static const size_t X300_10GE_DATA_FRAME_MAX_SIZE = 8000; // CHDR packet size in bytes static const size_t X300_1GE_DATA_FRAME_MAX_SIZE = 1472; // CHDR packet size in bytes -- cgit v1.2.3 From b32055cbde789fffff78a0311e372ba24f6c8f19 Mon Sep 17 00:00:00 2001 From: Michael West Date: Mon, 27 Mar 2017 19:53:57 -0700 Subject: PCIe: Add checks to make sure buffers are page alighed (requirement of NI-RIO driver) --- host/lib/transport/nirio_zero_copy.cpp | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'host/lib') diff --git a/host/lib/transport/nirio_zero_copy.cpp b/host/lib/transport/nirio_zero_copy.cpp index 9ed02a6dc..4212fab42 100644 --- a/host/lib/transport/nirio_zero_copy.cpp +++ b/host/lib/transport/nirio_zero_copy.cpp @@ -26,6 +26,7 @@ #include #include #include //sleep +#include //get_page_size() #include #include // std::max //@TODO: Move the register defs required by the class to a common location @@ -350,6 +351,7 @@ nirio_zero_copy::sptr nirio_zero_copy::make( ){ //Initialize xport_params zero_copy_xport_params xport_params = default_buff_args; + size_t page_size = boost::interprocess::mapped_region::get_page_size(); //The kernel buffer for this transport must be (num_frames * frame_size) big. Unlike ethernet, //where the kernel buffer size is independent of the circular buffer size for the transport, @@ -386,6 +388,22 @@ nirio_zero_copy::sptr nirio_zero_copy::make( size_t usr_send_buff_size = static_cast( hints.cast("send_buff_size", default_buff_args.num_send_frames)); + if (hints.has_key("send_buff_size")) + { + if (usr_send_buff_size % page_size != 0) + { + throw uhd::value_error((boost::format("send_buff_size must be multiple of %d") % page_size).str()); + } + } + + if (hints.has_key("send_frame_size") and hints.has_key("num_send_frames")) + { + if (usr_num_send_frames * xport_params.send_frame_size % page_size != 0) + { + throw uhd::value_error((boost::format("num_send_frames * send_frame_size must be an even multiple of %d") % page_size).str()); + } + } + if (hints.has_key("num_send_frames") and hints.has_key("send_buff_size")) { if (usr_send_buff_size < xport_params.send_frame_size) throw uhd::value_error("send_buff_size must be equal to or greater than (num_send_frames * send_frame_size)"); @@ -400,6 +418,11 @@ nirio_zero_copy::sptr nirio_zero_copy::make( xport_params.num_send_frames = usr_num_send_frames; } + if (xport_params.num_send_frames * xport_params.send_frame_size % page_size != 0) + { + throw uhd::value_error((boost::format("num_send_frames * send_frame_size must be an even multiple of %d") % page_size).str()); + } + return nirio_zero_copy::sptr(new nirio_zero_copy_impl(fpga_session, instance, xport_params)); } -- cgit v1.2.3 From e348353c4f5acef6a5ece11e9c336df4c15d65e1 Mon Sep 17 00:00:00 2001 From: Michael West Date: Wed, 29 Mar 2017 13:10:32 -0700 Subject: Implement worker threads to offload conversion of data and transport I/O for send() calls. - One worker thread per channel provides for improved scalability --- host/lib/transport/super_send_packet_handler.hpp | 211 ++++++++++++++++++----- 1 file changed, 170 insertions(+), 41 deletions(-) (limited to 'host/lib') diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 0acc8df4b..431cbf216 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -29,10 +29,13 @@ #include #include #include +#include #include #include #include #include +#include +#include #include #include @@ -49,6 +52,9 @@ namespace uhd { namespace transport { namespace sph { +static const size_t MAX_INTERLEAVE = 4; +static const double GET_BUFF_TIMEOUT = 0.1; + /*********************************************************************** * Super send packet handler * @@ -68,19 +74,39 @@ public: * \param size the number of transport channels */ send_packet_handler(const size_t size = 1): - _next_packet_seq(0), _cached_metadata(false) + _next_packet_seq(0), _cached_metadata(false) { this->set_enable_trailer(true); this->resize(size); } ~send_packet_handler(void){ - /* NOP */ + UHD_SAFE_CALL( + for (size_t i = 0; i < _worker_data.size(); i++) + { + _worker_data[i]->stop = true; + } + _worker_thread_group.join_all(); + ); } //! Resize the number of transport channels void resize(const size_t size){ if (this->size() == size) return; + + // Stop all worker threads + for (size_t i = 0; i < _worker_data.size(); i++) + { + _worker_data[i]->stop = true; + } + _worker_thread_group.join_all(); + _worker_threads.resize(size); + _worker_data.resize(size); + for (size_t i = 0; i < size; i++) + { + _worker_data[i] = boost::make_shared(); + } + _props.resize(size); static const uint64_t zero = 0; _zero_buffs.resize(size, &zero); @@ -145,7 +171,15 @@ public: * \param get_buff the getter function */ void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + if (_worker_threads[xport_chan]) + { + _worker_thread_group.remove_thread(_worker_threads[xport_chan]); + _worker_data[xport_chan]->stop = true; + _worker_threads[xport_chan]->join(); + _worker_data[xport_chan]->stop = false; + } _props.at(xport_chan).get_buff = get_buff; + _worker_threads[xport_chan] = _worker_thread_group.create_thread(boost::bind(&send_packet_handler::worker, this, xport_chan)); } //! Set the conversion routine for all channels @@ -381,63 +415,147 @@ private: if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(uint32_t); if_packet_info.packet_count = _next_packet_seq; - //get a buffer for each channel or timeout - BOOST_FOREACH(xport_chan_props_type &props, _props){ - if (not props.buff) props.buff = props.get_buff(timeout); - if (not props.buff) return 0; //timeout + // wait for all worker threads to be ready or timeout + boost::system_time expiration = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout * 1000)); + for (size_t i = 0; i < this->size(); i++) + { + while (not _worker_data[i]->ready) + { + if (boost::get_system_time() > expiration) + { + return 0; + } + } + _worker_data[i]->ready = false; } - //setup the data to share with converter threads + //setup the data to share with worker threads _convert_nsamps = nsamps_per_buff; _convert_buffs = &buffs; _convert_buffer_offset_bytes = buffer_offset_bytes; _convert_if_packet_info = &if_packet_info; - //perform N channels of conversion - for (size_t i = 0; i < this->size(); i++) { - convert_to_in_buff(i); + //start N channels of conversion + for (size_t i = 0; i < this->size(); i++) + { + _worker_data[i]->go = true; + } + + //make sure any sleeping worker threads are woken up + for (size_t i = 0; i < this->size(); i++) + { + // Acquiring the lock used by the condition variable + // takes too long, so do a spin wait. If the go flag + // is not cleared by this point, it will be cleared + // immediately by the worker thread when it wakes up. + while (_worker_data[i]->go) + { + _worker_data[i]->data_ready.notify_one(); + } + } + + //wait for all worker threads to be done + for (size_t i = 0; i < this->size(); i++) + { + //TODO: Implement a better wait strategy + //busy loop give fastest response, but these are just wasted cycles + while (not _worker_data[i]->done) {} + _worker_data[i]->done = false; } _next_packet_seq++; //increment sequence after commits return nsamps_per_buff; } - /*! Run the conversion from the internal buffers to the user's input - * buffer. + /*! Worker thread routine. * + * - Gets an internal data buffer * - Calls the converter * - Releases internal data buffers - * - Updates read/write pointers */ - UHD_INLINE void convert_to_in_buff(const size_t index) + void worker(const size_t index) { - //shortcut references to local data structures - managed_send_buffer::sptr &buff = _props[index].buff; - vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info; - const tx_streamer::buffs_type &buffs = *_convert_buffs; - - //fill IO buffs with pointers into the output buffer - const void *io_buffs[4/*max interleave*/]; - for (size_t i = 0; i < _num_inputs; i++){ - const char *b = reinterpret_cast(buffs[index*_num_inputs + i]); - io_buffs[i] = b + _convert_buffer_offset_bytes; + //maximum number of cycles to spin before waiting on condition variable + //the value of 30000000 was derived from 15ms on a 10 GHz CPU divided by 5 cycles per loop + //the assumption is that anything held up for 15ms can wait + static const size_t MAX_SPIN_CYCLES = 30000000; + + //maximum amount of time to wait before checking the stop flag + static const double MAX_WAIT = 0.1; + + managed_send_buffer::sptr buff; + vrt::if_packet_info_t if_packet_info; + std::vector in_buffs(MAX_INTERLEAVE); + boost::shared_ptr worker_data = _worker_data[index]; + boost::unique_lock lock(worker_data->data_ready_lock); + size_t spins = 0; + + while (not worker_data->stop) + { + if (not buff) + { + buff = _props[index].get_buff(MAX_WAIT); + if (not buff) + { + continue; + } + worker_data->ready = true; + } + + //make sure done flag is cleared by controlling thread before waiting on go signal + if (worker_data->done) + { + continue; + } + + //partial spin lock before wait + while (not worker_data->go and spins < MAX_SPIN_CYCLES) + { + spins++; + } + if (not worker_data->go and + not worker_data->data_ready.timed_wait(lock, boost::posix_time::milliseconds(long(MAX_WAIT*1000)))) + { + continue; + } + // Clear the go flag immediately to let the + // controlling thread know we are not sleeping. + worker_data->go = false; + + //reset the spin count + spins = 0; + + //pack metadata into a vrt header + uint32_t *otw_mem = buff->cast() + _header_offset_words32; + if_packet_info = *_convert_if_packet_info; + if_packet_info.has_sid = _props[index].has_sid; + if_packet_info.sid = _props[index].sid; + _vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //prepare the input buffers + for (size_t i = 0; i < _num_inputs; i++) + { + in_buffs[i] = + (reinterpret_cast((*_convert_buffs)[index*_num_inputs + i])) + + _convert_buffer_offset_bytes; + } + + //perform the conversion operation + _converter->conv(in_buffs, otw_mem, _convert_nsamps); + + //let the master know that new data can be prepared + _worker_data[index]->done = true; + + //commit the samples to the zero-copy interface + buff->commit( + (_header_offset_words32 + if_packet_info.num_packet_words32) + * sizeof(uint32_t) + ); + + //release the buffer + buff.reset(); } - const ref_vector in_buffs(io_buffs, _num_inputs); - - //pack metadata into a vrt header - uint32_t *otw_mem = buff->cast() + _header_offset_words32; - if_packet_info.has_sid = _props[index].has_sid; - if_packet_info.sid = _props[index].sid; - _vrt_packer(otw_mem, if_packet_info); - otw_mem += if_packet_info.num_header_words32; - - //perform the conversion operation - _converter->conv(in_buffs, otw_mem, _convert_nsamps); - - //commit the samples to the zero-copy interface - const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32; - buff->commit(num_vita_words32*sizeof(uint32_t)); - buff.reset(); //effectively a release } //! Shared variables for the worker threads @@ -445,7 +563,18 @@ private: const tx_streamer::buffs_type *_convert_buffs; size_t _convert_buffer_offset_bytes; vrt::if_packet_info_t *_convert_if_packet_info; - + struct worker_thread_data_t { + worker_thread_data_t() : ready(false), go(false), done(false), stop(false) {} + boost::atomic_bool ready; + boost::atomic_bool go; + boost::atomic_bool done; + boost::atomic_bool stop; + boost::mutex data_ready_lock; + boost::condition_variable data_ready; + }; + std::vector< boost::shared_ptr > _worker_data; + boost::thread_group _worker_thread_group; + std::vector _worker_threads; }; class send_packet_streamer : public send_packet_handler, public tx_streamer{ -- cgit v1.2.3 From 748689ae5402b154d78b60d61b67cb96b50d7916 Mon Sep 17 00:00:00 2001 From: Michael West Date: Wed, 29 Mar 2017 13:24:32 -0700 Subject: X300: Increase FW control ACK timeout - Fixes issue of fw_communication timeout errors on (Windows) systems where clock ticks are >10ms --- host/lib/usrp/x300/x300_fw_ctrl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'host/lib') diff --git a/host/lib/usrp/x300/x300_fw_ctrl.cpp b/host/lib/usrp/x300/x300_fw_ctrl.cpp index 080d235a4..d149dadf3 100644 --- a/host/lib/usrp/x300/x300_fw_ctrl.cpp +++ b/host/lib/usrp/x300/x300_fw_ctrl.cpp @@ -292,7 +292,7 @@ protected: private: niriok_proxy::sptr _drv_proxy; - static const uint32_t READ_TIMEOUT_IN_MS = 10; + static const uint32_t READ_TIMEOUT_IN_MS = 100; static const uint32_t INIT_TIMEOUT_IN_MS = 5000; }; -- cgit v1.2.3