aboutsummaryrefslogtreecommitdiffstats
path: root/host/include
diff options
context:
space:
mode:
Diffstat (limited to 'host/include')
-rw-r--r--host/include/uhd/transport/bounded_buffer.ipp89
-rw-r--r--host/include/uhd/transport/zero_copy_flow_ctrl.hpp58
2 files changed, 113 insertions, 34 deletions
diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp
index 35ffb293b..daca3f04f 100644
--- a/host/include/uhd/transport/bounded_buffer.ipp
+++ b/host/include/uhd/transport/bounded_buffer.ipp
@@ -28,7 +28,8 @@
namespace uhd{ namespace transport{
- template <typename elem_type> class bounded_buffer_detail : boost::noncopyable{
+ template <typename elem_type> class bounded_buffer_detail : boost::noncopyable
+ {
public:
bounded_buffer_detail(size_t capacity):
@@ -38,79 +39,97 @@ namespace uhd{ namespace transport{
_not_empty_fcn = boost::bind(&bounded_buffer_detail<elem_type>::not_empty, this);
}
- UHD_INLINE bool push_with_haste(const elem_type &elem){
+ UHD_INLINE bool push_with_haste(const elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (_buffer.full()) return false;
+ if (_buffer.full())
+ {
+ return false;
+ }
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
return true;
}
- UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){
+ UHD_INLINE bool push_with_pop_on_full(const elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (_buffer.full()){
+ if (_buffer.full())
+ {
_buffer.pop_back();
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
return false;
}
- else{
+ else {
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
return true;
}
}
- UHD_INLINE void push_with_wait(const elem_type &elem){
- if (this->push_with_haste(elem)) return;
+ UHD_INLINE void push_with_wait(const elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- _full_cond.wait(lock, _not_full_fcn);
+ if (_buffer.full())
+ {
+ _full_cond.wait(lock, _not_full_fcn);
+ }
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
}
- UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout){
- if (this->push_with_haste(elem)) return true;
+ UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (not _full_cond.timed_wait(
- lock, to_time_dur(timeout), _not_full_fcn
- )) return false;
+ if (_buffer.full())
+ {
+ if (not _full_cond.timed_wait(lock,
+ to_time_dur(timeout), _not_full_fcn))
+ {
+ return false;
+ }
+ }
_buffer.push_front(elem);
- lock.unlock();
_empty_cond.notify_one();
return true;
}
- UHD_INLINE bool pop_with_haste(elem_type &elem){
+ UHD_INLINE bool pop_with_haste(elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (_buffer.empty()) return false;
+ if (_buffer.empty())
+ {
+ return false;
+ }
this->pop_back(elem);
- lock.unlock();
_full_cond.notify_one();
return true;
}
- UHD_INLINE void pop_with_wait(elem_type &elem){
- if (this->pop_with_haste(elem)) return;
+ UHD_INLINE void pop_with_wait(elem_type &elem)
+ {
boost::mutex::scoped_lock lock(_mutex);
- _empty_cond.wait(lock, _not_empty_fcn);
+ if (_buffer.empty())
+ {
+ _empty_cond.wait(lock, _not_empty_fcn);
+ }
this->pop_back(elem);
- lock.unlock();
_full_cond.notify_one();
}
- UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout){
- if (this->pop_with_haste(elem)) return true;
+ UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout)
+ {
boost::mutex::scoped_lock lock(_mutex);
- if (not _empty_cond.timed_wait(
- lock, to_time_dur(timeout), _not_empty_fcn
- )) return false;
+ if (_buffer.empty())
+ {
+ if (not _empty_cond.timed_wait(lock, to_time_dur(timeout),
+ _not_empty_fcn))
+ {
+ return false;
+ }
+ }
this->pop_back(elem);
- lock.unlock();
_full_cond.notify_one();
return true;
}
@@ -131,13 +150,15 @@ namespace uhd{ namespace transport{
* 2) assign the back element to empty
* 3) pop the back to move the counter
*/
- UHD_INLINE void pop_back(elem_type &elem){
+ UHD_INLINE void pop_back(elem_type &elem)
+ {
elem = _buffer.back();
_buffer.back() = elem_type();
_buffer.pop_back();
}
- static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
+ static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout)
+ {
return boost::posix_time::microseconds(long(timeout*1e6));
}
diff --git a/host/include/uhd/transport/zero_copy_flow_ctrl.hpp b/host/include/uhd/transport/zero_copy_flow_ctrl.hpp
new file mode 100644
index 000000000..8075c503d
--- /dev/null
+++ b/host/include/uhd/transport/zero_copy_flow_ctrl.hpp
@@ -0,0 +1,58 @@
+//
+// Copyright 2017 Ettus Research
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_ZERO_COPY_FLOW_CTRL_HPP
+#define INCLUDED_ZERO_COPY_FLOW_CTRL_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace uhd{ namespace transport{
+
+/*!
+ * Flow control function.
+ * \param buff buffer to be sent or receive buffer being released
+ * \return true if OK, false if not
+ */
+typedef boost::function<bool(managed_buffer::sptr buff)> flow_ctrl_func;
+
+/*!
+ * Adds flow control to any zero_copy_if transport.
+ */
+class UHD_API zero_copy_flow_ctrl : public virtual zero_copy_if {
+public:
+ typedef boost::shared_ptr<zero_copy_flow_ctrl> sptr;
+
+ /*!
+ * Make flow controlled transport.
+ *
+ * \param transport a shared pointer to the transport interface
+ * \param send_flow_ctrl optional send flow control function called before buffer is sent
+ * \param recv_flow_ctrl optional receive flow control function called after buffer released
+ */
+ static sptr make(
+ zero_copy_if::sptr transport,
+ flow_ctrl_func send_flow_ctrl,
+ flow_ctrl_func recv_flow_ctrl
+ );
+};
+
+}} //namespace
+
+#endif /* INCLUDED_ZERO_COPY_FLOW_CTRL_HPP */