From 7f7d9b04cfc59a8dd743ff3dc9c816b57675786e Mon Sep 17 00:00:00 2001 From: michael-west Date: Wed, 8 Mar 2017 17:16:10 -0800 Subject: Fix bounded buffer functions so they don't release the lock before waiting on condition variables. --- host/include/uhd/transport/bounded_buffer.ipp | 89 +++++++++++++++++---------- 1 file changed, 55 insertions(+), 34 deletions(-) (limited to 'host/include') diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 35ffb293b..daca3f04f 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -28,7 +28,8 @@ namespace uhd{ namespace transport{ - template class bounded_buffer_detail : boost::noncopyable{ + template class bounded_buffer_detail : boost::noncopyable + { public: bounded_buffer_detail(size_t capacity): @@ -38,79 +39,97 @@ namespace uhd{ namespace transport{ _not_empty_fcn = boost::bind(&bounded_buffer_detail::not_empty, this); } - UHD_INLINE bool push_with_haste(const elem_type &elem){ + UHD_INLINE bool push_with_haste(const elem_type &elem) + { boost::mutex::scoped_lock lock(_mutex); - if (_buffer.full()) return false; + if (_buffer.full()) + { + return false; + } _buffer.push_front(elem); - lock.unlock(); _empty_cond.notify_one(); return true; } - UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){ + UHD_INLINE bool push_with_pop_on_full(const elem_type &elem) + { boost::mutex::scoped_lock lock(_mutex); - if (_buffer.full()){ + if (_buffer.full()) + { _buffer.pop_back(); _buffer.push_front(elem); - lock.unlock(); _empty_cond.notify_one(); return false; } - else{ + else { _buffer.push_front(elem); - lock.unlock(); _empty_cond.notify_one(); return true; } } - UHD_INLINE void push_with_wait(const elem_type &elem){ - if (this->push_with_haste(elem)) return; + UHD_INLINE void push_with_wait(const elem_type &elem) + { boost::mutex::scoped_lock lock(_mutex); - _full_cond.wait(lock, _not_full_fcn); + if (_buffer.full()) + { + _full_cond.wait(lock, _not_full_fcn); + } _buffer.push_front(elem); - lock.unlock(); _empty_cond.notify_one(); } - UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout){ - if (this->push_with_haste(elem)) return true; + UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout) + { boost::mutex::scoped_lock lock(_mutex); - if (not _full_cond.timed_wait( - lock, to_time_dur(timeout), _not_full_fcn - )) return false; + if (_buffer.full()) + { + if (not _full_cond.timed_wait(lock, + to_time_dur(timeout), _not_full_fcn)) + { + return false; + } + } _buffer.push_front(elem); - lock.unlock(); _empty_cond.notify_one(); return true; } - UHD_INLINE bool pop_with_haste(elem_type &elem){ + UHD_INLINE bool pop_with_haste(elem_type &elem) + { boost::mutex::scoped_lock lock(_mutex); - if (_buffer.empty()) return false; + if (_buffer.empty()) + { + return false; + } this->pop_back(elem); - lock.unlock(); _full_cond.notify_one(); return true; } - UHD_INLINE void pop_with_wait(elem_type &elem){ - if (this->pop_with_haste(elem)) return; + UHD_INLINE void pop_with_wait(elem_type &elem) + { boost::mutex::scoped_lock lock(_mutex); - _empty_cond.wait(lock, _not_empty_fcn); + if (_buffer.empty()) + { + _empty_cond.wait(lock, _not_empty_fcn); + } this->pop_back(elem); - lock.unlock(); _full_cond.notify_one(); } - UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout){ - if (this->pop_with_haste(elem)) return true; + UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout) + { boost::mutex::scoped_lock lock(_mutex); - if (not _empty_cond.timed_wait( - lock, to_time_dur(timeout), _not_empty_fcn - )) return false; + if (_buffer.empty()) + { + if (not _empty_cond.timed_wait(lock, to_time_dur(timeout), + _not_empty_fcn)) + { + return false; + } + } this->pop_back(elem); - lock.unlock(); _full_cond.notify_one(); return true; } @@ -131,13 +150,15 @@ namespace uhd{ namespace transport{ * 2) assign the back element to empty * 3) pop the back to move the counter */ - UHD_INLINE void pop_back(elem_type &elem){ + UHD_INLINE void pop_back(elem_type &elem) + { elem = _buffer.back(); _buffer.back() = elem_type(); _buffer.pop_back(); } - static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ + static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout) + { return boost::posix_time::microseconds(long(timeout*1e6)); } -- cgit v1.2.3 From 5b07551577d5c596cb690484ad190d8ad8bac643 Mon Sep 17 00:00:00 2001 From: michael-west Date: Wed, 8 Mar 2017 17:17:39 -0800 Subject: Added class to add flow control to any zero_copy_if interface. --- host/include/uhd/transport/zero_copy_flow_ctrl.hpp | 58 ++++++ host/lib/transport/CMakeLists.txt | 1 + host/lib/transport/zero_copy_flow_ctrl.cpp | 227 +++++++++++++++++++++ 3 files changed, 286 insertions(+) create mode 100644 host/include/uhd/transport/zero_copy_flow_ctrl.hpp create mode 100644 host/lib/transport/zero_copy_flow_ctrl.cpp (limited to 'host/include') diff --git a/host/include/uhd/transport/zero_copy_flow_ctrl.hpp b/host/include/uhd/transport/zero_copy_flow_ctrl.hpp new file mode 100644 index 000000000..8075c503d --- /dev/null +++ b/host/include/uhd/transport/zero_copy_flow_ctrl.hpp @@ -0,0 +1,58 @@ +// +// Copyright 2017 Ettus Research +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#ifndef INCLUDED_ZERO_COPY_FLOW_CTRL_HPP +#define INCLUDED_ZERO_COPY_FLOW_CTRL_HPP + +#include +#include +#include +#include + +namespace uhd{ namespace transport{ + +/*! + * Flow control function. + * \param buff buffer to be sent or receive buffer being released + * \return true if OK, false if not + */ +typedef boost::function flow_ctrl_func; + +/*! + * Adds flow control to any zero_copy_if transport. + */ +class UHD_API zero_copy_flow_ctrl : public virtual zero_copy_if { +public: + typedef boost::shared_ptr sptr; + + /*! + * Make flow controlled transport. + * + * \param transport a shared pointer to the transport interface + * \param send_flow_ctrl optional send flow control function called before buffer is sent + * \param recv_flow_ctrl optional receive flow control function called after buffer released + */ + static sptr make( + zero_copy_if::sptr transport, + flow_ctrl_func send_flow_ctrl, + flow_ctrl_func recv_flow_ctrl + ); +}; + +}} //namespace + +#endif /* INCLUDED_ZERO_COPY_FLOW_CTRL_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 44c8d59af..a6d84cc4a 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -129,6 +129,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp ) IF(ENABLE_X300) diff --git a/host/lib/transport/zero_copy_flow_ctrl.cpp b/host/lib/transport/zero_copy_flow_ctrl.cpp new file mode 100644 index 000000000..06d7934e2 --- /dev/null +++ b/host/lib/transport/zero_copy_flow_ctrl.cpp @@ -0,0 +1,227 @@ +// +// Copyright 2017 Ettus Research +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace uhd; +using namespace uhd::transport; + +typedef bounded_buffer bounded_buffer_t; + +class zero_copy_flow_ctrl_msb : public managed_send_buffer +{ +public: + zero_copy_flow_ctrl_msb( + flow_ctrl_func flow_ctrl + ) : + _mb(NULL), + _flow_ctrl(flow_ctrl) + { + /* NOP */ + } + + ~zero_copy_flow_ctrl_msb() + { + /* NOP */ + } + + void release() + { + if (_mb) + { + _mb->commit(size()); + while (_flow_ctrl and not _flow_ctrl(_mb)) {} + _mb.reset(); + } + } + + UHD_INLINE sptr get(sptr &mb) + { + _mb = mb; + return make(this, _mb->cast(), _mb->size()); + } + +private: + sptr _mb; + flow_ctrl_func _flow_ctrl; +}; + +class zero_copy_flow_ctrl_mrb : public managed_recv_buffer +{ +public: + zero_copy_flow_ctrl_mrb( + flow_ctrl_func flow_ctrl + ) : + _mb(NULL), + _flow_ctrl(flow_ctrl) + { + /* NOP */ + } + + ~zero_copy_flow_ctrl_mrb() + { + /* NOP */ + } + + void release() + { + if (_mb) + { + _mb->commit(size()); + while (_flow_ctrl and not _flow_ctrl(_mb)) {} + _mb.reset(); + } + } + + UHD_INLINE sptr get(sptr &mb) + { + _mb = mb; + return make(this, _mb->cast(), _mb->size()); + } + +private: + sptr _mb; + flow_ctrl_func _flow_ctrl; +}; + +/*********************************************************************** + * Zero copy offload transport: + * An intermediate transport that utilizes threading to free + * the main thread from any receive work. + **********************************************************************/ +class zero_copy_flow_ctrl_impl : public zero_copy_flow_ctrl { +public: + typedef boost::shared_ptr sptr; + + zero_copy_flow_ctrl_impl(zero_copy_if::sptr transport, + flow_ctrl_func send_flow_ctrl, + flow_ctrl_func recv_flow_ctrl) : + _transport(transport), + _send_buffers(transport->get_num_send_frames()), + _recv_buffers(transport->get_num_recv_frames()), + _send_buff_index(0), + _recv_buff_index(0), + _send_flow_ctrl(send_flow_ctrl), + _recv_flow_ctrl(recv_flow_ctrl) + { + UHD_LOG << "Created zero_copy_flow_ctrl" << std::endl; + + for (size_t i = 0; i < transport->get_num_send_frames(); i++) + { + _send_buffers[i] = boost::make_shared(_send_flow_ctrl); + } + for (size_t i = 0; i < transport->get_num_recv_frames(); i++) + { + _recv_buffers[i] = boost::make_shared(_recv_flow_ctrl); + } + } + + ~zero_copy_flow_ctrl_impl() + { + } + + /******************************************************************* + * Receive implementation: + * Pop the receive buffer pointer from the underlying transport + ******************************************************************/ + UHD_INLINE managed_recv_buffer::sptr get_recv_buff(double timeout) + { + managed_recv_buffer::sptr ptr; + managed_recv_buffer::sptr buff = _transport->get_recv_buff(timeout); + if (buff) + { + boost::shared_ptr mb = _recv_buffers[_recv_buff_index++]; + _recv_buff_index %= _recv_buffers.size(); + ptr = mb->get(buff); + } + return ptr; + } + + UHD_INLINE size_t get_num_recv_frames() const + { + return _transport->get_num_recv_frames(); + } + + UHD_INLINE size_t get_recv_frame_size() const + { + return _transport->get_recv_frame_size(); + } + + /******************************************************************* + * Send implementation: + * Pass the send buffer pointer from the underlying transport + ******************************************************************/ + managed_send_buffer::sptr get_send_buff(double timeout) + { + managed_send_buffer::sptr ptr; + managed_send_buffer::sptr buff = _transport->get_send_buff(timeout); + if (buff) + { + boost::shared_ptr mb = _send_buffers[_send_buff_index++]; + _send_buff_index %= _send_buffers.size(); + ptr = mb->get(buff); + } + return ptr; + } + + UHD_INLINE size_t get_num_send_frames() const + { + return _transport->get_num_send_frames(); + } + + UHD_INLINE size_t get_send_frame_size() const + { + return _transport->get_send_frame_size(); + } + +private: + // The underlying transport + zero_copy_if::sptr _transport; + + // buffers + std::vector< boost::shared_ptr > _send_buffers; + std::vector< boost::shared_ptr > _recv_buffers; + size_t _send_buff_index; + size_t _recv_buff_index; + + // Flow control functions + flow_ctrl_func _send_flow_ctrl; + flow_ctrl_func _recv_flow_ctrl; +}; + +zero_copy_flow_ctrl::sptr zero_copy_flow_ctrl::make( + zero_copy_if::sptr transport, + flow_ctrl_func send_flow_ctrl, + flow_ctrl_func recv_flow_ctrl +) +{ + zero_copy_flow_ctrl_impl::sptr zero_copy_flow_ctrl( + new zero_copy_flow_ctrl_impl(transport, send_flow_ctrl, recv_flow_ctrl) + ); + + return zero_copy_flow_ctrl; +} -- cgit v1.2.3