aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/include/uhd/transport/CMakeLists.txt2
-rw-r--r--host/include/uhd/transport/alignment_buffer.hpp136
-rw-r--r--host/include/uhd/transport/bounded_buffer.hpp141
-rw-r--r--host/test/CMakeLists.txt1
-rw-r--r--host/test/bounded_buffer_test.cpp46
5 files changed, 326 insertions, 0 deletions
diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt
index 4cefffa24..23a4aae94 100644
--- a/host/include/uhd/transport/CMakeLists.txt
+++ b/host/include/uhd/transport/CMakeLists.txt
@@ -17,6 +17,8 @@
INSTALL(FILES
+ alignment_buffer.hpp
+ bounded_buffer.hpp
convert_types.hpp
if_addrs.hpp
udp_simple.hpp
diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp
new file mode 100644
index 000000000..7fa4f2694
--- /dev/null
+++ b/host/include/uhd/transport/alignment_buffer.hpp
@@ -0,0 +1,136 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP
+#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/shared_ptr.hpp>
+#include <utility>
+#include <vector>
+
+namespace uhd{ namespace transport{
+
+ /*!
+ * Imlement a templated alignment buffer:
+ * Used for aligning asynchronously pushed elements with matching ids.
+ */
+ template <typename elem_type, typename seq_type> class alignment_buffer{
+ public:
+ typedef boost::shared_ptr<alignment_buffer<elem_type, seq_type> > sptr;
+
+ /*!
+ * Create the alignment buffer.
+ * \param capacity the maximum elements per index
+ * \param width the number of elements to align
+ */
+ alignment_buffer(size_t capacity, size_t width){
+ _buffs.resize(width);
+ for (size_t i = 0; i < width; i++){
+ _buffs[i].buff = bounded_buffer_sptr(new bounded_buffer_type(capacity));
+ _buffs[i].has_popped_element = false;
+ }
+ }
+
+ /*!
+ * Destroy this alignment buffer.
+ */
+ ~alignment_buffer(void){
+ /* NOP */
+ }
+
+ /*!
+ * Push a single element into the buffer specified by index.
+ * Notify the condition variable for a thread blocked in pop.
+ * \param elem the element to push
+ * \param seq the sequence identifier
+ * \param index the buffer index
+ */
+ void push_elem_with_wait(const elem_type &elem, const seq_type &seq, size_t index){
+ _buffs[index].buff.push_with_wait(buff_contents_type(elem, seq));
+ _pushed_cond.notify_one();
+ }
+
+ /*!
+ * Pop an aligned set of elements from this alignment buffer.
+ * \param elems a collection to store the aligned elements
+ */
+ template <typename elems_type>
+ void pop_elems_with_wait(elems_type &elems){
+ //TODO................................
+ buff_contents_type buff_contents_tmp;
+ for (size_t i = 0; i < _buffs.size();){
+ if (_buffs[i].has_popped_element){
+ i++:
+ continue;
+ }
+ _buffs[i].pop_with_wait(buff_contents_tmp);
+ if (buff_contents_tmp.second == _expected_seq_id){
+ _buffs[i].has_popped_element = true;
+ i++;
+ continue;
+ }
+
+ //if the sequence number is older, pop until we get the current sequence number
+ //do this by setting has popped element false and continuing on the same condition
+ if (buff_contents_tmp.second < _expected_seq_id){
+ _buffs[i].has_popped_element = false;
+ continue;
+ }
+
+ //if the sequence number is newer, start from scratch at the new sequence number
+ //do this by setting all has popped elements false and restarting on index zero
+ if (buff_contents_tmp.second > _expected_seq_id){
+ _expected_seq_id = buff_contents_tmp.second;
+ for (size_t j = 0; j < i; j++){
+ _buffs[j].has_popped_element = false;
+ }
+ i = 0;
+ continue;
+ }
+ }
+ //if aligned
+ for (size_t i = 0; i < _buffs.size(); i++){
+ elems[i] = _buffs[i].popped_element;
+ _buffs[i].has_popped_element = false;
+ }
+ }
+
+ private:
+ //a vector of bounded buffers for each index
+ typedef std::pair<elem_type, seq_type> buff_contents_type;
+ typedef bounded_buffer<buff_contents_type> bounded_buffer_type;
+ typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr;
+ struct buff_type{
+ bounded_buffer_sptr buff;
+ elem_type popped_element;
+ bool has_popped_element;
+ };
+ std::vector<buff_type> _buffs;
+
+ //the seq identifier to align with
+ seq_type _expected_seq_id;
+
+ //a condition to notify when a new element is pushed
+ boost::condition_variable _pushed_cond;
+ };
+
+}} //namespace
+
+#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP */
diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp
new file mode 100644
index 000000000..26fc9c0a0
--- /dev/null
+++ b/host/include/uhd/transport/bounded_buffer.hpp
@@ -0,0 +1,141 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP
+#define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP
+
+#include <uhd/config.hpp>
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/circular_buffer.hpp>
+#include <boost/thread/condition_variable.hpp>
+
+namespace uhd{ namespace transport{
+
+ /*!
+ * Imlement a templated bounded buffer:
+ * Used for passing elements between threads in a producer-consumer model.
+ * The bounded buffer implemented waits and timed waits with condition variables.
+ * The pop operation blocks on the bounded_buffer to become non empty.
+ * The push operation blocks on the bounded_buffer to become non full.
+ */
+ template <typename elem_type> class bounded_buffer{
+ public:
+ typedef boost::shared_ptr<bounded_buffer<elem_type> > sptr;
+
+ /*!
+ * Create a new bounded_buffer of a given size.
+ * \param capacity the bounded_buffer capacity
+ */
+ bounded_buffer(size_t capacity) : _buffer(capacity), _size(0){
+ /* NOP */
+ }
+
+ /*!
+ * Destroy this bounded_buffer.
+ */
+ ~bounded_buffer(void){
+ /* NOP */
+ }
+
+ /*!
+ * Is the bounded_buffer buffer not full?
+ * \return true for not full
+ */
+ bool is_not_full(void) const{
+ return _size != _buffer.capacity();
+ }
+
+ /*!
+ * Is the bounded_buffer buffer not empty?
+ * \return true for not empty
+ */
+ bool is_not_empty(void) const{
+ return _size != 0;
+ }
+
+ /*!
+ * Push a new element into the bounded_buffer.
+ * Wait until the bounded_buffer becomes non-full.
+ * \param elem the new element to push
+ */
+ UHD_INLINE void push_with_wait(const elem_type &elem){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _full_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::is_not_full, this));
+ _buffer.push_front(elem); ++_size;
+ lock.unlock();
+ _empty_cond.notify_one();
+ }
+
+ /*!
+ * Push a new element into the bounded_buffer.
+ * Wait until the bounded_buffer becomes non-full or timeout.
+ * \param elem the new element to push
+ * \param time the timeout time
+ * \return false when the operation times out
+ */
+ template<typename time_type> UHD_INLINE
+ bool push_with_timed_wait(const elem_type &elem, const time_type &time){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::is_not_full, this))) return false;
+ _buffer.push_front(elem); ++_size;
+ lock.unlock();
+ _empty_cond.notify_one();
+ return true;
+ }
+
+ /*!
+ * Pop an element from the bounded_buffer.
+ * Wait until the bounded_buffer becomes non-empty.
+ * \param elem the element reference pop to
+ */
+ UHD_INLINE void pop_with_wait(elem_type &elem){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _empty_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::is_not_empty, this));
+ elem = _buffer[--_size];
+ lock.unlock();
+ _full_cond.notify_one();
+ }
+
+ /*!
+ * Pop an element from the bounded_buffer.
+ * Wait until the bounded_buffer becomes non-empty or timeout.
+ * \param elem the element reference pop to
+ * \param time the timeout time
+ * \return false when the operation times out
+ */
+ template<typename time_type> UHD_INLINE
+ bool pop_with_timed_wait(elem_type &elem, const time_type &time){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::is_not_empty, this))) return false;
+ elem = _buffer[--_size];
+ lock.unlock();
+ _full_cond.notify_one();
+ return true;
+ }
+
+ private:
+ boost::mutex _mutex;
+ boost::condition_variable _empty_cond, _full_cond;
+ boost::circular_buffer<elem_type> _buffer;
+ size_t _size;
+
+ };
+
+}} //namespace
+
+#endif /* INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP */
diff --git a/host/test/CMakeLists.txt b/host/test/CMakeLists.txt
index 61b0b503d..c7c6d7fad 100644
--- a/host/test/CMakeLists.txt
+++ b/host/test/CMakeLists.txt
@@ -21,6 +21,7 @@
ADD_EXECUTABLE(main_test
main_test.cpp
addr_test.cpp
+ bounded_buffer_test.cpp
dict_test.cpp
error_test.cpp
gain_handler_test.cpp
diff --git a/host/test/bounded_buffer_test.cpp b/host/test/bounded_buffer_test.cpp
new file mode 100644
index 000000000..5d6b6faec
--- /dev/null
+++ b/host/test/bounded_buffer_test.cpp
@@ -0,0 +1,46 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <boost/test/unit_test.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+
+//test #Include
+#include <uhd/transport/alignment_buffer.hpp>
+
+using namespace uhd::transport;
+
+static const boost::posix_time::milliseconds timeout(10);
+
+BOOST_AUTO_TEST_CASE(test_bounded_buffer){
+ bounded_buffer<int>::sptr bb(new bounded_buffer<int>(3));
+
+ //push elements, check for timeout
+ BOOST_CHECK(bb->push_with_timed_wait(0, timeout));
+ BOOST_CHECK(bb->push_with_timed_wait(1, timeout));
+ BOOST_CHECK(bb->push_with_timed_wait(2, timeout));
+ BOOST_CHECK(not bb->push_with_timed_wait(3, timeout));
+
+ int val;
+ //pop elements, check for timeout and check values
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 0);
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 1);
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 2);
+ BOOST_CHECK(not bb->pop_with_timed_wait(val, timeout));
+}