aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/CMakeLists.txt4
-rw-r--r--host/include/uhd/transport/bounded_buffer.ipp89
-rw-r--r--host/include/uhd/transport/zero_copy_flow_ctrl.hpp58
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/nirio_zero_copy.cpp23
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp211
-rw-r--r--host/lib/transport/zero_copy_flow_ctrl.cpp227
-rw-r--r--host/lib/usrp/device3/device3_io_impl.cpp68
-rw-r--r--host/lib/usrp/x300/x300_fw_ctrl.cpp2
-rw-r--r--host/lib/usrp/x300/x300_impl.cpp8
-rw-r--r--host/lib/usrp/x300/x300_impl.hpp21
11 files changed, 597 insertions, 115 deletions
diff --git a/host/CMakeLists.txt b/host/CMakeLists.txt
index 8def96811..21706d580 100644
--- a/host/CMakeLists.txt
+++ b/host/CMakeLists.txt
@@ -358,8 +358,8 @@ UHD_INSTALL(FILES
#{{{IMG_SECTION
# This section is written automatically by /images/create_imgs_package.py
# Any manual changes in here will be overwritten.
-SET(UHD_IMAGES_MD5SUM "53a1ea139d8344fec9914f05db79bdd0")
-SET(UHD_IMAGES_DOWNLOAD_SRC "uhd-images_003.010.001.001-7-g63fcfb95.zip")
+SET(UHD_IMAGES_MD5SUM "487beb2dd2477ad90f9b21399931979c")
+SET(UHD_IMAGES_DOWNLOAD_SRC "uhd-images_003.010.001.001-27-g47672ede.zip")
#}}}
########################################################################
diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp
index 35ffb293b..daca3f04f 100644
--- a/host/include/uhd/transport/bounded_buffer.ipp
+++ b/host/include/uhd/transport/bounded_buffer.ipp
@@ -28,7 +28,8 @@
namespace uhd{ namespace transport{
- template <typename elem_type> class bounded_buffer_detail : boost::noncopyable{
+ template <typename elem_type> class bounded_buffer_detail : boost::noncopyable
+ {
public:
bounded_buffer_detail(size_t capacity):
@@ -38,79 +39,97 @@ namespace uhd{ namespace transport{
_not_empty_fcn = boost::bind(&bounded_buffer_detail<elem_type>::not_empty, this);
}
- UHD_INLINE bool push_with_haste(const elem_type &elem){
+ UHD_INLINE bool push_with_haste(const elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (_buffer.full()) return false;
+ if (_buffer.full())
+ {
+ return false;
+ }
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
return true;
}
- UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){
+ UHD_INLINE bool push_with_pop_on_full(const elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (_buffer.full()){
+ if (_buffer.full())
+ {
_buffer.pop_back();
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
return false;
}
- else{
+ else {
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
return true;
}
}
- UHD_INLINE void push_with_wait(const elem_type &elem){
- if (this->push_with_haste(elem)) return;
+ UHD_INLINE void push_with_wait(const elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- _full_cond.wait(lock, _not_full_fcn);
+ if (_buffer.full())
+ {
+ _full_cond.wait(lock, _not_full_fcn);
+ }
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
}
- UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout){
- if (this->push_with_haste(elem)) return true;
+ UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (not _full_cond.timed_wait(
- lock, to_time_dur(timeout), _not_full_fcn
- )) return false;
+ if (_buffer.full())
+ {
+ if (not _full_cond.timed_wait(lock,
+ to_time_dur(timeout), _not_full_fcn))
+ {
+ return false;
+ }
+ }
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
return true;
}
- UHD_INLINE bool pop_with_haste(elem_type &elem){
+ UHD_INLINE bool pop_with_haste(elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (_buffer.empty()) return false;
+ if (_buffer.empty())
+ {
+ return false;
+ }
this->pop_back(elem);
- lock.unlock();
_full_cond.notify_one();
return true;
}
- UHD_INLINE void pop_with_wait(elem_type &elem){
- if (this->pop_with_haste(elem)) return;
+ UHD_INLINE void pop_with_wait(elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- _empty_cond.wait(lock, _not_empty_fcn);
+ if (_buffer.empty())
+ {
+ _empty_cond.wait(lock, _not_empty_fcn);
+ }
this->pop_back(elem);
- lock.unlock();
_full_cond.notify_one();
}
- UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout){
- if (this->pop_with_haste(elem)) return true;
+ UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (not _empty_cond.timed_wait(
- lock, to_time_dur(timeout), _not_empty_fcn
- )) return false;
+ if (_buffer.empty())
+ {
+ if (not _empty_cond.timed_wait(lock, to_time_dur(timeout),
+ _not_empty_fcn))
+ {
+ return false;
+ }
+ }
this->pop_back(elem);
- lock.unlock();
_full_cond.notify_one();
return true;
}
@@ -131,13 +150,15 @@ namespace uhd{ namespace transport{
* 2) assign the back element to empty
* 3) pop the back to move the counter
*/
- UHD_INLINE void pop_back(elem_type &elem){
+ UHD_INLINE void pop_back(elem_type &elem)
+ {
elem = _buffer.back();
_buffer.back() = elem_type();
_buffer.pop_back();
}
- static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
+ static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout)
+ {
return boost::posix_time::microseconds(long(timeout*1e6));
}
diff --git a/host/include/uhd/transport/zero_copy_flow_ctrl.hpp b/host/include/uhd/transport/zero_copy_flow_ctrl.hpp
new file mode 100644
index 000000000..8075c503d
--- /dev/null
+++ b/host/include/uhd/transport/zero_copy_flow_ctrl.hpp
@@ -0,0 +1,58 @@
+//
+// Copyright 2017 Ettus Research
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_ZERO_COPY_FLOW_CTRL_HPP
+#define INCLUDED_ZERO_COPY_FLOW_CTRL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace uhd{ namespace transport{
+
+/*!
+ * Flow control function.
+ * \param buff buffer to be sent or receive buffer being released
+ * \return true if OK, false if not
+ */
+typedef boost::function<bool(managed_buffer::sptr buff)> flow_ctrl_func;
+
+/*!
+ * Adds flow control to any zero_copy_if transport.
+ */
+class UHD_API zero_copy_flow_ctrl : public virtual zero_copy_if {
+public:
+ typedef boost::shared_ptr<zero_copy_flow_ctrl> sptr;
+
+ /*!
+ * Make flow controlled transport.
+ *
+ * \param transport a shared pointer to the transport interface
+ * \param send_flow_ctrl optional send flow control function called before buffer is sent
+ * \param recv_flow_ctrl optional receive flow control function called after buffer released
+ */
+ static sptr make(
+ zero_copy_if::sptr transport,
+ flow_ctrl_func send_flow_ctrl,
+ flow_ctrl_func recv_flow_ctrl
+ );
+};
+
+}} //namespace
+
+#endif /* INCLUDED_ZERO_COPY_FLOW_CTRL_HPP */
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 44c8d59af..a6d84cc4a 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -129,6 +129,7 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp
${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp
${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp
)
IF(ENABLE_X300)
diff --git a/host/lib/transport/nirio_zero_copy.cpp b/host/lib/transport/nirio_zero_copy.cpp
index 3c52e5080..14118d393 100644
--- a/host/lib/transport/nirio_zero_copy.cpp
+++ b/host/lib/transport/nirio_zero_copy.cpp
@@ -26,6 +26,7 @@
#include <boost/make_shared.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp> //sleep
+#include <boost/interprocess/mapped_region.hpp> //get_page_size()
#include <vector>
#include <algorithm> // std::max
//@TODO: Move the register defs required by the class to a common location
@@ -350,6 +351,7 @@ nirio_zero_copy::sptr nirio_zero_copy::make(
){
//Initialize xport_params
zero_copy_xport_params xport_params = default_buff_args;
+ size_t page_size = boost::interprocess::mapped_region::get_page_size();
//The kernel buffer for this transport must be (num_frames * frame_size) big. Unlike ethernet,
//where the kernel buffer size is independent of the circular buffer size for the transport,
@@ -386,6 +388,22 @@ nirio_zero_copy::sptr nirio_zero_copy::make(
size_t usr_send_buff_size = static_cast<size_t>(
hints.cast<double>("send_buff_size", default_buff_args.num_send_frames));
+ if (hints.has_key("send_buff_size"))
+ {
+ if (usr_send_buff_size % page_size != 0)
+ {
+ throw uhd::value_error((boost::format("send_buff_size must be multiple of %d") % page_size).str());
+ }
+ }
+
+ if (hints.has_key("send_frame_size") and hints.has_key("num_send_frames"))
+ {
+ if (usr_num_send_frames * xport_params.send_frame_size % page_size != 0)
+ {
+ throw uhd::value_error((boost::format("num_send_frames * send_frame_size must be an even multiple of %d") % page_size).str());
+ }
+ }
+
if (hints.has_key("num_send_frames") and hints.has_key("send_buff_size")) {
if (usr_send_buff_size < xport_params.send_frame_size)
throw uhd::value_error("send_buff_size must be equal to or greater than (num_send_frames * send_frame_size)");
@@ -400,6 +418,11 @@ nirio_zero_copy::sptr nirio_zero_copy::make(
xport_params.num_send_frames = usr_num_send_frames;
}
+ if (xport_params.num_send_frames * xport_params.send_frame_size % page_size != 0)
+ {
+ throw uhd::value_error((boost::format("num_send_frames * send_frame_size must be an even multiple of %d") % page_size).str());
+ }
+
return nirio_zero_copy::sptr(new nirio_zero_copy_impl(fpga_session, instance, xport_params));
}
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 8ad76fa39..003689a78 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -29,9 +29,12 @@
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
+#include <uhd/utils/safe_call.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/thread_time.hpp>
#include <boost/function.hpp>
+#include <boost/atomic.hpp>
+#include <boost/make_shared.hpp>
#include <iostream>
#include <vector>
@@ -48,6 +51,9 @@ namespace uhd {
namespace transport {
namespace sph {
+static const size_t MAX_INTERLEAVE = 4;
+static const double GET_BUFF_TIMEOUT = 0.1;
+
/***********************************************************************
* Super send packet handler
*
@@ -67,19 +73,39 @@ public:
* \param size the number of transport channels
*/
send_packet_handler(const size_t size = 1):
- _next_packet_seq(0), _cached_metadata(false)
+ _next_packet_seq(0), _cached_metadata(false)
{
this->set_enable_trailer(true);
this->resize(size);
}
~send_packet_handler(void){
- /* NOP */
+ UHD_SAFE_CALL(
+ for (size_t i = 0; i < _worker_data.size(); i++)
+ {
+ _worker_data[i]->stop = true;
+ }
+ _worker_thread_group.join_all();
+ );
}
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
+
+ // Stop all worker threads
+ for (size_t i = 0; i < _worker_data.size(); i++)
+ {
+ _worker_data[i]->stop = true;
+ }
+ _worker_thread_group.join_all();
+ _worker_threads.resize(size);
+ _worker_data.resize(size);
+ for (size_t i = 0; i < size; i++)
+ {
+ _worker_data[i] = boost::make_shared<worker_thread_data_t>();
+ }
+
_props.resize(size);
static const uint64_t zero = 0;
_zero_buffs.resize(size, &zero);
@@ -144,7 +170,15 @@ public:
* \param get_buff the getter function
*/
void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){
+ if (_worker_threads[xport_chan])
+ {
+ _worker_thread_group.remove_thread(_worker_threads[xport_chan]);
+ _worker_data[xport_chan]->stop = true;
+ _worker_threads[xport_chan]->join();
+ _worker_data[xport_chan]->stop = false;
+ }
_props.at(xport_chan).get_buff = get_buff;
+ _worker_threads[xport_chan] = _worker_thread_group.create_thread(boost::bind(&send_packet_handler::worker, this, xport_chan));
}
//! Set the conversion routine for all channels
@@ -380,63 +414,147 @@ private:
if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(uint32_t);
if_packet_info.packet_count = _next_packet_seq;
- //get a buffer for each channel or timeout
- for(xport_chan_props_type &props: _props){
- if (not props.buff) props.buff = props.get_buff(timeout);
- if (not props.buff) return 0; //timeout
+ // wait for all worker threads to be ready or timeout
+ boost::system_time expiration = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout * 1000));
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ while (not _worker_data[i]->ready)
+ {
+ if (boost::get_system_time() > expiration)
+ {
+ return 0;
+ }
+ }
+ _worker_data[i]->ready = false;
}
- //setup the data to share with converter threads
+ //setup the data to share with worker threads
_convert_nsamps = nsamps_per_buff;
_convert_buffs = &buffs;
_convert_buffer_offset_bytes = buffer_offset_bytes;
_convert_if_packet_info = &if_packet_info;
- //perform N channels of conversion
- for (size_t i = 0; i < this->size(); i++) {
- convert_to_in_buff(i);
+ //start N channels of conversion
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ _worker_data[i]->go = true;
+ }
+
+ //make sure any sleeping worker threads are woken up
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ // Acquiring the lock used by the condition variable
+ // takes too long, so do a spin wait. If the go flag
+ // is not cleared by this point, it will be cleared
+ // immediately by the worker thread when it wakes up.
+ while (_worker_data[i]->go)
+ {
+ _worker_data[i]->data_ready.notify_one();
+ }
+ }
+
+ //wait for all worker threads to be done
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ //TODO: Implement a better wait strategy
+ //busy loop give fastest response, but these are just wasted cycles
+ while (not _worker_data[i]->done) {}
+ _worker_data[i]->done = false;
}
_next_packet_seq++; //increment sequence after commits
return nsamps_per_buff;
}
- /*! Run the conversion from the internal buffers to the user's input
- * buffer.
+ /*! Worker thread routine.
*
+ * - Gets an internal data buffer
* - Calls the converter
* - Releases internal data buffers
- * - Updates read/write pointers
*/
- UHD_INLINE void convert_to_in_buff(const size_t index)
+ void worker(const size_t index)
{
- //shortcut references to local data structures
- managed_send_buffer::sptr &buff = _props[index].buff;
- vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info;
- const tx_streamer::buffs_type &buffs = *_convert_buffs;
-
- //fill IO buffs with pointers into the output buffer
- const void *io_buffs[4/*max interleave*/];
- for (size_t i = 0; i < _num_inputs; i++){
- const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]);
- io_buffs[i] = b + _convert_buffer_offset_bytes;
+ //maximum number of cycles to spin before waiting on condition variable
+ //the value of 30000000 was derived from 15ms on a 10 GHz CPU divided by 5 cycles per loop
+ //the assumption is that anything held up for 15ms can wait
+ static const size_t MAX_SPIN_CYCLES = 30000000;
+
+ //maximum amount of time to wait before checking the stop flag
+ static const double MAX_WAIT = 0.1;
+
+ managed_send_buffer::sptr buff;
+ vrt::if_packet_info_t if_packet_info;
+ std::vector<const void *> in_buffs(MAX_INTERLEAVE);
+ boost::shared_ptr<worker_thread_data_t> worker_data = _worker_data[index];
+ boost::unique_lock<boost::mutex> lock(worker_data->data_ready_lock);
+ size_t spins = 0;
+
+ while (not worker_data->stop)
+ {
+ if (not buff)
+ {
+ buff = _props[index].get_buff(MAX_WAIT);
+ if (not buff)
+ {
+ continue;
+ }
+ worker_data->ready = true;
+ }
+
+ //make sure done flag is cleared by controlling thread before waiting on go signal
+ if (worker_data->done)
+ {
+ continue;
+ }
+
+ //partial spin lock before wait
+ while (not worker_data->go and spins < MAX_SPIN_CYCLES)
+ {
+ spins++;
+ }
+ if (not worker_data->go and
+ not worker_data->data_ready.timed_wait(lock, boost::posix_time::milliseconds(long(MAX_WAIT*1000))))
+ {
+ continue;
+ }
+ // Clear the go flag immediately to let the
+ // controlling thread know we are not sleeping.
+ worker_data->go = false;
+
+ //reset the spin count
+ spins = 0;
+
+ //pack metadata into a vrt header
+ uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32;
+ if_packet_info = *_convert_if_packet_info;
+ if_packet_info.has_sid = _props[index].has_sid;
+ if_packet_info.sid = _props[index].sid;
+ _vrt_packer(otw_mem, if_packet_info);
+ otw_mem += if_packet_info.num_header_words32;
+
+ //prepare the input buffers
+ for (size_t i = 0; i < _num_inputs; i++)
+ {
+ in_buffs[i] =
+ (reinterpret_cast<const char *>((*_convert_buffs)[index*_num_inputs + i]))
+ + _convert_buffer_offset_bytes;
+ }
+
+ //perform the conversion operation
+ _converter->conv(in_buffs, otw_mem, _convert_nsamps);
+
+ //let the master know that new data can be prepared
+ _worker_data[index]->done = true;
+
+ //commit the samples to the zero-copy interface
+ buff->commit(
+ (_header_offset_words32 + if_packet_info.num_packet_words32)
+ * sizeof(uint32_t)
+ );
+
+ //release the buffer
+ buff.reset();
}
- const ref_vector<const void *> in_buffs(io_buffs, _num_inputs);
-
- //pack metadata into a vrt header
- uint32_t *otw_mem = buff->cast<uint32_t *>() + _header_offset_words32;
- if_packet_info.has_sid = _props[index].has_sid;
- if_packet_info.sid = _props[index].sid;
- _vrt_packer(otw_mem, if_packet_info);
- otw_mem += if_packet_info.num_header_words32;
-
- //perform the conversion operation
- _converter->conv(in_buffs, otw_mem, _convert_nsamps);
-
- //commit the samples to the zero-copy interface
- const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
- buff->commit(num_vita_words32*sizeof(uint32_t));
- buff.reset(); //effectively a release
}
//! Shared variables for the worker threads
@@ -444,7 +562,18 @@ private:
const tx_streamer::buffs_type *_convert_buffs;
size_t _convert_buffer_offset_bytes;
vrt::if_packet_info_t *_convert_if_packet_info;
-
+ struct worker_thread_data_t {
+ worker_thread_data_t() : ready(false), go(false), done(false), stop(false) {}
+ boost::atomic_bool ready;
+ boost::atomic_bool go;
+ boost::atomic_bool done;
+ boost::atomic_bool stop;
+ boost::mutex data_ready_lock;
+ boost::condition_variable data_ready;
+ };
+ std::vector< boost::shared_ptr<worker_thread_data_t> > _worker_data;
+ boost::thread_group _worker_thread_group;
+ std::vector<boost::thread *> _worker_threads;
};
class send_packet_streamer : public send_packet_handler, public tx_streamer{
diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp
new file mode 100644
index 000000000..06d7934e2
--- /dev/null
+++ b/host/lib/transport/zero_copy_flow_ctrl.cpp
@@ -0,0 +1,227 @@
+//
+// Copyright 2017 Ettus Research
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/zero_copy_flow_ctrl.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/safe_call.hpp>
+#include <boost/format.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/bind.hpp>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+typedef bounded_buffer<managed_send_buffer::sptr> bounded_buffer_t;
+
+class zero_copy_flow_ctrl_msb : public managed_send_buffer
+{
+public:
+ zero_copy_flow_ctrl_msb(
+ flow_ctrl_func flow_ctrl
+ ) :
+ _mb(NULL),
+ _flow_ctrl(flow_ctrl)
+ {
+ /* NOP */
+ }
+
+ ~zero_copy_flow_ctrl_msb()
+ {
+ /* NOP */
+ }
+
+ void release()
+ {
+ if (_mb)
+ {
+ _mb->commit(size());
+ while (_flow_ctrl and not _flow_ctrl(_mb)) {}
+ _mb.reset();
+ }
+ }
+
+ UHD_INLINE sptr get(sptr &mb)
+ {
+ _mb = mb;
+ return make(this, _mb->cast<void *>(), _mb->size());
+ }
+
+private:
+ sptr _mb;
+ flow_ctrl_func _flow_ctrl;
+};
+
+class zero_copy_flow_ctrl_mrb : public managed_recv_buffer
+{
+public:
+ zero_copy_flow_ctrl_mrb(
+ flow_ctrl_func flow_ctrl
+ ) :
+ _mb(NULL),
+ _flow_ctrl(flow_ctrl)
+ {
+ /* NOP */
+ }
+
+ ~zero_copy_flow_ctrl_mrb()
+ {
+ /* NOP */
+ }
+
+ void release()
+ {
+ if (_mb)
+ {
+ _mb->commit(size());
+ while (_flow_ctrl and not _flow_ctrl(_mb)) {}
+ _mb.reset();
+ }
+ }
+
+ UHD_INLINE sptr get(sptr &mb)
+ {
+ _mb = mb;
+ return make(this, _mb->cast<void *>(), _mb->size());
+ }
+
+private:
+ sptr _mb;
+ flow_ctrl_func _flow_ctrl;
+};
+
+/***********************************************************************
+ * Zero copy offload transport:
+ * An intermediate transport that utilizes threading to free
+ * the main thread from any receive work.
+ **********************************************************************/
+class zero_copy_flow_ctrl_impl : public zero_copy_flow_ctrl {
+public:
+ typedef boost::shared_ptr<zero_copy_flow_ctrl_impl> sptr;
+
+ zero_copy_flow_ctrl_impl(zero_copy_if::sptr transport,
+ flow_ctrl_func send_flow_ctrl,
+ flow_ctrl_func recv_flow_ctrl) :
+ _transport(transport),
+ _send_buffers(transport->get_num_send_frames()),
+ _recv_buffers(transport->get_num_recv_frames()),
+ _send_buff_index(0),
+ _recv_buff_index(0),
+ _send_flow_ctrl(send_flow_ctrl),
+ _recv_flow_ctrl(recv_flow_ctrl)
+ {
+ UHD_LOG << "Created zero_copy_flow_ctrl" << std::endl;
+
+ for (size_t i = 0; i < transport->get_num_send_frames(); i++)
+ {
+ _send_buffers[i] = boost::make_shared<zero_copy_flow_ctrl_msb>(_send_flow_ctrl);
+ }
+ for (size_t i = 0; i < transport->get_num_recv_frames(); i++)
+ {
+ _recv_buffers[i] = boost::make_shared<zero_copy_flow_ctrl_mrb>(_recv_flow_ctrl);
+ }
+ }
+
+ ~zero_copy_flow_ctrl_impl()
+ {
+ }
+
+ /*******************************************************************
+ * Receive implementation:
+ * Pop the receive buffer pointer from the underlying transport
+ ******************************************************************/
+ UHD_INLINE managed_recv_buffer::sptr get_recv_buff(double timeout)
+ {
+ managed_recv_buffer::sptr ptr;
+ managed_recv_buffer::sptr buff = _transport->get_recv_buff(timeout);
+ if (buff)
+ {
+ boost::shared_ptr<zero_copy_flow_ctrl_mrb> mb = _recv_buffers[_recv_buff_index++];
+ _recv_buff_index %= _recv_buffers.size();
+ ptr = mb->get(buff);
+ }
+ return ptr;
+ }
+
+ UHD_INLINE size_t get_num_recv_frames() const
+ {
+ return _transport->get_num_recv_frames();
+ }
+
+ UHD_INLINE size_t get_recv_frame_size() const
+ {
+ return _transport->get_recv_frame_size();
+ }
+
+ /*******************************************************************
+ * Send implementation:
+ * Pass the send buffer pointer from the underlying transport
+ ******************************************************************/
+ managed_send_buffer::sptr get_send_buff(double timeout)
+ {
+ managed_send_buffer::sptr ptr;
+ managed_send_buffer::sptr buff = _transport->get_send_buff(timeout);
+ if (buff)
+ {
+ boost::shared_ptr<zero_copy_flow_ctrl_msb> mb = _send_buffers[_send_buff_index++];
+ _send_buff_index %= _send_buffers.size();
+ ptr = mb->get(buff);
+ }
+ return ptr;
+ }
+
+ UHD_INLINE size_t get_num_send_frames() const
+ {
+ return _transport->get_num_send_frames();
+ }
+
+ UHD_INLINE size_t get_send_frame_size() const
+ {
+ return _transport->get_send_frame_size();
+ }
+
+private:
+ // The underlying transport
+ zero_copy_if::sptr _transport;
+
+ // buffers
+ std::vector< boost::shared_ptr<zero_copy_flow_ctrl_msb> > _send_buffers;
+ std::vector< boost::shared_ptr<zero_copy_flow_ctrl_mrb> > _recv_buffers;
+ size_t _send_buff_index;
+ size_t _recv_buff_index;
+
+ // Flow control functions
+ flow_ctrl_func _send_flow_ctrl;
+ flow_ctrl_func _recv_flow_ctrl;
+};
+
+zero_copy_flow_ctrl::sptr zero_copy_flow_ctrl::make(
+ zero_copy_if::sptr transport,
+ flow_ctrl_func send_flow_ctrl,
+ flow_ctrl_func recv_flow_ctrl
+)
+{
+ zero_copy_flow_ctrl_impl::sptr zero_copy_flow_ctrl(
+ new zero_copy_flow_ctrl_impl(transport, send_flow_ctrl, recv_flow_ctrl)
+ );
+
+ return zero_copy_flow_ctrl;
+}
diff --git a/host/lib/usrp/device3/device3_io_impl.cpp b/host/lib/usrp/device3/device3_io_impl.cpp
index 8a42fe148..46cc0ee5a 100644
--- a/host/lib/usrp/device3/device3_io_impl.cpp
+++ b/host/lib/usrp/device3/device3_io_impl.cpp
@@ -31,6 +31,8 @@
#include "../../rfnoc/tx_stream_terminator.hpp"
#include <uhd/rfnoc/rate_node_ctrl.hpp>
#include <uhd/rfnoc/radio_ctrl.hpp>
+#include <uhd/transport/zero_copy_flow_ctrl.hpp>
+#include <boost/atomic.hpp>
#define UHD_TX_STREAMER_LOG() UHD_LOGGER_TRACE("STREAMER")
#define UHD_RX_STREAMER_LOG() UHD_LOGGER_TRACE("STREAMER")
@@ -305,12 +307,13 @@ struct tx_fc_cache_t
device_channel(0),
last_seq_out(0),
last_seq_ack(0),
- seq_queue(1){}
+ last_seq_ack_cache(0) {}
+
size_t stream_channel;
size_t device_channel;
size_t last_seq_out;
- size_t last_seq_ack;
- uhd::transport::bounded_buffer<size_t> seq_queue;
+ boost::atomic_size_t last_seq_ack;
+ size_t last_seq_ack_cache;
boost::shared_ptr<device3_impl::async_md_type> async_queue;
boost::shared_ptr<device3_impl::async_md_type> old_async_queue;
};
@@ -338,33 +341,43 @@ static size_t get_tx_flow_control_window(
return window_in_pkts;
}
-static managed_send_buffer::sptr get_tx_buff_with_flowctrl(
- task::sptr /*holds ref*/,
- boost::shared_ptr<tx_fc_cache_t> fc_cache,
+// TODO: Remove this function
+// This function only exists to make sure the transport is not destroyed
+// until it is no longer needed.
+static managed_send_buffer::sptr get_tx_buff(
zero_copy_if::sptr xport,
- size_t fc_window,
const double timeout
){
+ return xport->get_send_buff(timeout);
+}
+
+static bool tx_flow_ctrl(
+ task::sptr /*holds ref*/,
+ boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ size_t fc_window,
+ managed_buffer::sptr
+) {
+ // Busy loop waiting for flow control update. This is necessary because
+ // at this point there is data trying to be sent and it must be sent as
+ // quickly as possible when the flow control update arrives to avoid
+ // underruns at high rates. This is also OK because it only occurs when
+ // data needs to be sent and flow control is holding it back.
while (true)
{
// delta is the amount of FC credit we've used up
- const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) - (fc_cache->last_seq_ack & HW_SEQ_NUM_MASK);
+ const size_t delta = (fc_cache->last_seq_out & HW_SEQ_NUM_MASK) -
+ (fc_cache->last_seq_ack_cache & HW_SEQ_NUM_MASK);
// If we want to send another packet, we must have FC credit left
if ((delta & HW_SEQ_NUM_MASK) < fc_window)
- break;
-
- // If credit is all used up, we check seq_queue for more.
- const bool ok = fc_cache->seq_queue.pop_with_timed_wait(fc_cache->last_seq_ack, timeout);
- if (not ok) {
- return managed_send_buffer::sptr(); //timeout waiting for flow control
+ {
+ // Packet will be sent
+ fc_cache->last_seq_out++; //update seq
+ return true;
}
+ // update the cached value from the atomic
+ fc_cache->last_seq_ack_cache = fc_cache->last_seq_ack;
}
-
- managed_send_buffer::sptr buff = xport->get_send_buff(timeout);
- if (buff) {
- fc_cache->last_seq_out++; //update seq, this will actually be a send
- }
- return buff;
+ return false;
}
#define DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL 0
@@ -381,7 +394,9 @@ static void handle_tx_async_msgs(
) {
managed_recv_buffer::sptr buff = xport->get_recv_buff();
if (not buff)
+ {
return;
+ }
//extract packet info
vrt::if_packet_info_t if_packet_info;
@@ -430,8 +445,7 @@ static void handle_tx_async_msgs(
//The FC response and the burst ack are two indicators that the radio
//consumed packets. Use them to update the FC metadata
if (metadata.event_code == DEVICE3_ASYNC_EVENT_CODE_FLOW_CTRL) {
- const size_t seq = metadata.user_payload[0];
- fc_cache->seq_queue.push_with_pop_on_full(seq);
+ fc_cache->last_seq_ack = metadata.user_payload[0];
}
//FC responses don't propagate up to the user so filter them here
@@ -842,13 +856,19 @@ tx_streamer::sptr device3_impl::get_tx_stream(const uhd::stream_args_t &args_)
}
}
+ // Add flow control
+ xport.send = zero_copy_flow_ctrl::make(
+ xport.send,
+ boost::bind(&tx_flow_ctrl, task, fc_cache, fc_window, _1),
+ NULL);
+
//Give the streamer a functor to get the send buffer
- //get_tx_buff_with_flowctrl is static so bind has no lifetime issues
+ //get_tx_buff is static so bind has no lifetime issues
//xport.send (sptr) is required to add streamer->data-transport lifetime dependency
//task (sptr) is required to add a streamer->async-handler lifetime dependency
my_streamer->set_xport_chan_get_buff(
stream_i,
- boost::bind(&get_tx_buff_with_flowctrl, task, fc_cache, xport.send, fc_window, _1)
+ boost::bind(&get_tx_buff, xport.send, _1)
);
//Give the streamer a functor handled received async messages
my_streamer->set_async_receiver(
diff --git a/host/lib/usrp/x300/x300_fw_ctrl.cpp b/host/lib/usrp/x300/x300_fw_ctrl.cpp
index 1df0fa611..5ff40c966 100644
--- a/host/lib/usrp/x300/x300_fw_ctrl.cpp
+++ b/host/lib/usrp/x300/x300_fw_ctrl.cpp
@@ -292,7 +292,7 @@ protected:
private:
niriok_proxy::sptr _drv_proxy;
- static const uint32_t READ_TIMEOUT_IN_MS = 10;
+ static const uint32_t READ_TIMEOUT_IN_MS = 100;
static const uint32_t INIT_TIMEOUT_IN_MS = 5000;
};
diff --git a/host/lib/usrp/x300/x300_impl.cpp b/host/lib/usrp/x300/x300_impl.cpp
index 785f7b4a3..ac08cf565 100644
--- a/host/lib/usrp/x300/x300_impl.cpp
+++ b/host/lib/usrp/x300/x300_impl.cpp
@@ -1128,14 +1128,14 @@ uhd::both_xports_t x300_impl::make_transport(
? X300_PCIE_RX_DATA_FRAME_SIZE
: X300_PCIE_MSG_FRAME_SIZE;
- default_buff_args.num_send_frames =
- (xport_type == TX_DATA)
- ? X300_PCIE_DATA_NUM_FRAMES
+ default_buff_args.num_send_frames =
+ (xport_type == TX_DATA)
+ ? X300_PCIE_TX_DATA_NUM_FRAMES
: X300_PCIE_MSG_NUM_FRAMES;
default_buff_args.num_recv_frames =
(xport_type == RX_DATA)
- ? X300_PCIE_DATA_NUM_FRAMES
+ ? X300_PCIE_RX_DATA_NUM_FRAMES
: X300_PCIE_MSG_NUM_FRAMES;
xports.recv = nirio_zero_copy::make(
diff --git a/host/lib/usrp/x300/x300_impl.hpp b/host/lib/usrp/x300/x300_impl.hpp
index 2de295bd9..27f3f130e 100644
--- a/host/lib/usrp/x300/x300_impl.hpp
+++ b/host/lib/usrp/x300/x300_impl.hpp
@@ -52,15 +52,18 @@ static const size_t X300_RX_SW_BUFF_SIZE_ETH = 0x2000000;//32MiB For a
static const size_t X300_RX_SW_BUFF_SIZE_ETH_MACOS = 0x100000; //1Mib
//The FIFO closest to the DMA controller is 1023 elements deep for RX and 1029 elements deep for TX
-//where an element is 8 bytes. For best throughput ensure that the data frame fits in these buffers.
-//Also ensure that the kernel has enough frames to hold buffered TX and RX data
-static const size_t X300_PCIE_RX_DATA_FRAME_SIZE = 8184; //bytes
-static const size_t X300_PCIE_TX_DATA_FRAME_SIZE = 8184; //bytes
-static const size_t X300_PCIE_DATA_NUM_FRAMES = 2048;
-static const size_t X300_PCIE_MSG_FRAME_SIZE = 256; //bytes
-static const size_t X300_PCIE_MSG_NUM_FRAMES = 64;
-static const size_t X300_PCIE_MAX_CHANNELS = 6;
-static const size_t X300_PCIE_MAX_MUXED_XPORTS = 32;
+//where an element is 8 bytes. The buffers (number of frames * frame size) must be aligned to the
+//memory page size. For the control, we are getting lucky because 64 frames * 256 bytes each aligns
+//with the typical page size of 4096 bytes. Since most page sizes are 4096 bytes or some multiple of
+//that, keep the number of frames * frame size aligned to it.
+static const size_t X300_PCIE_RX_DATA_FRAME_SIZE = 4096; //bytes
+static const size_t X300_PCIE_RX_DATA_NUM_FRAMES = 4096;
+static const size_t X300_PCIE_TX_DATA_FRAME_SIZE = 4096; //bytes
+static const size_t X300_PCIE_TX_DATA_NUM_FRAMES = 4096;
+static const size_t X300_PCIE_MSG_FRAME_SIZE = 256; //bytes
+static const size_t X300_PCIE_MSG_NUM_FRAMES = 64;
+static const size_t X300_PCIE_MAX_CHANNELS = 6;
+static const size_t X300_PCIE_MAX_MUXED_XPORTS = 32;
static const size_t X300_10GE_DATA_FRAME_MAX_SIZE = 8000; // CHDR packet size in bytes
static const size_t X300_1GE_DATA_FRAME_MAX_SIZE = 1472; // CHDR packet size in bytes