diff options
| -rw-r--r-- | host/include/uhd/transport/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.hpp | 136 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 141 | ||||
| -rw-r--r-- | host/test/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/test/bounded_buffer_test.cpp | 46 | 
5 files changed, 326 insertions, 0 deletions
| diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt index 4cefffa24..23a4aae94 100644 --- a/host/include/uhd/transport/CMakeLists.txt +++ b/host/include/uhd/transport/CMakeLists.txt @@ -17,6 +17,8 @@  INSTALL(FILES +    alignment_buffer.hpp +    bounded_buffer.hpp      convert_types.hpp      if_addrs.hpp      udp_simple.hpp diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp new file mode 100644 index 000000000..7fa4f2694 --- /dev/null +++ b/host/include/uhd/transport/alignment_buffer.hpp @@ -0,0 +1,136 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP +#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/shared_ptr.hpp> +#include <utility> +#include <vector> + +namespace uhd{ namespace transport{ + +    /*! +     * Imlement a templated alignment buffer: +     * Used for aligning asynchronously pushed elements with matching ids. +     */ +    template <typename elem_type, typename seq_type> class alignment_buffer{ +    public: +        typedef boost::shared_ptr<alignment_buffer<elem_type, seq_type> > sptr; + +        /*! +         * Create the alignment buffer. +         * \param capacity the maximum elements per index +         * \param width the number of elements to align +         */ +        alignment_buffer(size_t capacity, size_t width){ +            _buffs.resize(width); +            for (size_t i = 0; i < width; i++){ +                _buffs[i].buff = bounded_buffer_sptr(new bounded_buffer_type(capacity)); +                _buffs[i].has_popped_element = false; +            } +        } + +        /*! +         * Destroy this alignment buffer. +         */ +        ~alignment_buffer(void){ +            /* NOP */ +        } + +        /*! +         * Push a single element into the buffer specified by index. +         * Notify the condition variable for a thread blocked in pop. +         * \param elem the element to push +         * \param seq the sequence identifier +         * \param index the buffer index +         */ +        void push_elem_with_wait(const elem_type &elem, const seq_type &seq, size_t index){ +            _buffs[index].buff.push_with_wait(buff_contents_type(elem, seq)); +            _pushed_cond.notify_one(); +        } + +        /*! +         * Pop an aligned set of elements from this alignment buffer. +         * \param elems a collection to store the aligned elements +         */ +        template <typename elems_type> +        void pop_elems_with_wait(elems_type &elems){ +            //TODO................................ +            buff_contents_type buff_contents_tmp; +            for (size_t i = 0; i < _buffs.size();){ +                if (_buffs[i].has_popped_element){ +                    i++: +                    continue; +                } +                _buffs[i].pop_with_wait(buff_contents_tmp); +                if (buff_contents_tmp.second == _expected_seq_id){ +                    _buffs[i].has_popped_element = true; +                    i++; +                    continue; +                } + +                //if the sequence number is older, pop until we get the current sequence number +                //do this by setting has popped element false and continuing on the same condition +                if (buff_contents_tmp.second < _expected_seq_id){ +                    _buffs[i].has_popped_element = false; +                    continue; +                } + +                //if the sequence number is newer, start from scratch at the new sequence number +                //do this by setting all has popped elements false and restarting on index zero +                if (buff_contents_tmp.second > _expected_seq_id){ +                    _expected_seq_id = buff_contents_tmp.second; +                    for (size_t j = 0; j < i; j++){ +                        _buffs[j].has_popped_element = false; +                    } +                    i = 0; +                    continue; +                } +            } +            //if aligned +            for (size_t i = 0; i < _buffs.size(); i++){ +                elems[i] = _buffs[i].popped_element; +                _buffs[i].has_popped_element = false; +            } +        } + +    private: +        //a vector of bounded buffers for each index +        typedef std::pair<elem_type, seq_type> buff_contents_type; +        typedef bounded_buffer<buff_contents_type> bounded_buffer_type; +        typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr; +        struct buff_type{ +            bounded_buffer_sptr buff; +            elem_type popped_element; +            bool has_popped_element; +        }; +        std::vector<buff_type> _buffs; + +        //the seq identifier to align with +        seq_type _expected_seq_id; + +        //a condition to notify when a new element is pushed +        boost::condition_variable _pushed_cond; +    }; + +}} //namespace + +#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP */ diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp new file mode 100644 index 000000000..26fc9c0a0 --- /dev/null +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -0,0 +1,141 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP +#define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP + +#include <uhd/config.hpp> +#include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/circular_buffer.hpp> +#include <boost/thread/condition_variable.hpp> + +namespace uhd{ namespace transport{ + +    /*! +     * Imlement a templated bounded buffer: +     * Used for passing elements between threads in a producer-consumer model. +     * The bounded buffer implemented waits and timed waits with condition variables. +     * The pop operation blocks on the bounded_buffer to become non empty. +     * The push operation blocks on the bounded_buffer to become non full. +     */ +    template <typename elem_type> class bounded_buffer{ +    public: +        typedef boost::shared_ptr<bounded_buffer<elem_type> > sptr; + +        /*! +         * Create a new bounded_buffer of a given size. +         * \param capacity the bounded_buffer capacity +         */ +        bounded_buffer(size_t capacity) : _buffer(capacity), _size(0){ +            /* NOP */ +        } + +        /*! +         * Destroy this bounded_buffer. +         */ +        ~bounded_buffer(void){ +            /* NOP */ +        } + +        /*! +         * Is the bounded_buffer buffer not full? +         * \return true for not full +         */ +        bool is_not_full(void) const{ +            return _size != _buffer.capacity(); +        } + +        /*! +         * Is the bounded_buffer buffer not empty? +         * \return true for not empty +         */ +        bool is_not_empty(void) const{ +            return _size != 0; +        } + +        /*! +         * Push a new element into the bounded_buffer. +         * Wait until the bounded_buffer becomes non-full. +         * \param elem the new element to push +         */ +        UHD_INLINE void push_with_wait(const elem_type &elem){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            _full_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::is_not_full, this)); +            _buffer.push_front(elem); ++_size; +            lock.unlock(); +            _empty_cond.notify_one(); +        } + +        /*! +         * Push a new element into the bounded_buffer. +         * Wait until the bounded_buffer becomes non-full or timeout. +         * \param elem the new element to push +         * \param time the timeout time +         * \return false when the operation times out +         */ +        template<typename time_type> UHD_INLINE +        bool push_with_timed_wait(const elem_type &elem, const time_type &time){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::is_not_full, this))) return false; +            _buffer.push_front(elem); ++_size; +            lock.unlock(); +            _empty_cond.notify_one(); +            return true; +        } + +        /*! +         * Pop an element from the bounded_buffer. +         * Wait until the bounded_buffer becomes non-empty. +         * \param elem the element reference pop to +         */ +        UHD_INLINE void pop_with_wait(elem_type &elem){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            _empty_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::is_not_empty, this)); +            elem = _buffer[--_size]; +            lock.unlock(); +            _full_cond.notify_one(); +        } + +        /*! +         * Pop an element from the bounded_buffer. +         * Wait until the bounded_buffer becomes non-empty or timeout. +         * \param elem the element reference pop to +         * \param time the timeout time +         * \return false when the operation times out +         */ +        template<typename time_type> UHD_INLINE +        bool pop_with_timed_wait(elem_type &elem, const time_type &time){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::is_not_empty, this))) return false; +            elem = _buffer[--_size]; +            lock.unlock(); +            _full_cond.notify_one(); +            return true; +        } + +    private: +        boost::mutex _mutex; +        boost::condition_variable _empty_cond, _full_cond; +        boost::circular_buffer<elem_type> _buffer; +        size_t _size; + +    }; + +}} //namespace + +#endif /* INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP */ diff --git a/host/test/CMakeLists.txt b/host/test/CMakeLists.txt index 61b0b503d..c7c6d7fad 100644 --- a/host/test/CMakeLists.txt +++ b/host/test/CMakeLists.txt @@ -21,6 +21,7 @@  ADD_EXECUTABLE(main_test      main_test.cpp      addr_test.cpp +    bounded_buffer_test.cpp      dict_test.cpp      error_test.cpp      gain_handler_test.cpp diff --git a/host/test/bounded_buffer_test.cpp b/host/test/bounded_buffer_test.cpp new file mode 100644 index 000000000..5d6b6faec --- /dev/null +++ b/host/test/bounded_buffer_test.cpp @@ -0,0 +1,46 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include <boost/test/unit_test.hpp> +#include <uhd/transport/bounded_buffer.hpp> + +//test #Include +#include <uhd/transport/alignment_buffer.hpp> + +using namespace uhd::transport; + +static const boost::posix_time::milliseconds timeout(10); + +BOOST_AUTO_TEST_CASE(test_bounded_buffer){ +    bounded_buffer<int>::sptr bb(new bounded_buffer<int>(3)); + +    //push elements, check for timeout +    BOOST_CHECK(bb->push_with_timed_wait(0, timeout)); +    BOOST_CHECK(bb->push_with_timed_wait(1, timeout)); +    BOOST_CHECK(bb->push_with_timed_wait(2, timeout)); +    BOOST_CHECK(not bb->push_with_timed_wait(3, timeout)); + +    int val; +    //pop elements, check for timeout and check values +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 0); +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 1); +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 2); +    BOOST_CHECK(not bb->pop_with_timed_wait(val, timeout)); +} | 
