diff options
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.hpp | 97 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 53 | ||||
| -rw-r--r-- | host/test/bounded_buffer_test.cpp | 79 | 
3 files changed, 153 insertions, 76 deletions
| diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp index 7fa4f2694..b33b80da9 100644 --- a/host/include/uhd/transport/alignment_buffer.hpp +++ b/host/include/uhd/transport/alignment_buffer.hpp @@ -41,10 +41,9 @@ namespace uhd{ namespace transport{           * \param width the number of elements to align           */          alignment_buffer(size_t capacity, size_t width){ -            _buffs.resize(width);              for (size_t i = 0; i < width; i++){ -                _buffs[i].buff = bounded_buffer_sptr(new bounded_buffer_type(capacity)); -                _buffs[i].has_popped_element = false; +                _buffs.push_back(bounded_buffer_sptr(new bounded_buffer_type(capacity))); +                _all_indexes.push_back(i);              }          } @@ -56,60 +55,72 @@ namespace uhd{ namespace transport{          }          /*! -         * Push a single element into the buffer specified by index. -         * Notify the condition variable for a thread blocked in pop. +         * Push an element with sequence id into the buffer at index.           * \param elem the element to push           * \param seq the sequence identifier           * \param index the buffer index +         * \return true if the element fit without popping for space           */ -        void push_elem_with_wait(const elem_type &elem, const seq_type &seq, size_t index){ -            _buffs[index].buff.push_with_wait(buff_contents_type(elem, seq)); -            _pushed_cond.notify_one(); +        UHD_INLINE bool push_with_pop_on_full( +            const elem_type &elem, +            const seq_type &seq, +            size_t index +        ){ +            return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq));          }          /*!           * Pop an aligned set of elements from this alignment buffer.           * \param elems a collection to store the aligned elements +         * \param time the timeout time +         * \return false when the operation times out           */ -        template <typename elems_type> -        void pop_elems_with_wait(elems_type &elems){ -            //TODO................................ +        template <typename elems_type, typename time_type> +        bool pop_elems_with_timed_wait(elems_type &elems, const time_type &time){              buff_contents_type buff_contents_tmp; -            for (size_t i = 0; i < _buffs.size();){ -                if (_buffs[i].has_popped_element){ -                    i++: -                    continue; +            std::list<size_t> indexes_to_do(_all_indexes); + +            //the seq identifier to align with +            seq_type expected_seq_id = seq_type(); +            bool expected_seq_id_valid = false; + +            //get an aligned set of elements from the buffers: +            while(indexes_to_do.size() != 0){ +                size_t index = indexes_to_do.back(); + +                //pop an element off for this index +                if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false; + +                //grab the current sequence id if not valid +                if (not expected_seq_id_valid){ +                    expected_seq_id_valid = true; +                    expected_seq_id = buff_contents_tmp.second;                  } -                _buffs[i].pop_with_wait(buff_contents_tmp); -                if (buff_contents_tmp.second == _expected_seq_id){ -                    _buffs[i].has_popped_element = true; -                    i++; + +                //if the sequence id matches: +                //  store the popped element into the output, +                //  remove this index from the list and continue +                if (buff_contents_tmp.second == expected_seq_id){ +                    elems[index] = buff_contents_tmp.first; +                    indexes_to_do.pop_back();                      continue;                  } -                //if the sequence number is older, pop until we get the current sequence number -                //do this by setting has popped element false and continuing on the same condition -                if (buff_contents_tmp.second < _expected_seq_id){ -                    _buffs[i].has_popped_element = false; +                //if the sequence id is older: +                //  continue with the same index to try again +                if (buff_contents_tmp.second < expected_seq_id){                      continue;                  } -                //if the sequence number is newer, start from scratch at the new sequence number -                //do this by setting all has popped elements false and restarting on index zero -                if (buff_contents_tmp.second > _expected_seq_id){ -                    _expected_seq_id = buff_contents_tmp.second; -                    for (size_t j = 0; j < i; j++){ -                        _buffs[j].has_popped_element = false; -                    } -                    i = 0; +                //if the sequence id is newer: +                //  start from scratch at the new sequence number +                if (buff_contents_tmp.second > expected_seq_id){ +                    expected_seq_id = buff_contents_tmp.second; +                    indexes_to_do = _all_indexes;                      continue;                  }              } -            //if aligned -            for (size_t i = 0; i < _buffs.size(); i++){ -                elems[i] = _buffs[i].popped_element; -                _buffs[i].has_popped_element = false; -            } +            return true;          }      private: @@ -117,18 +128,8 @@ namespace uhd{ namespace transport{          typedef std::pair<elem_type, seq_type> buff_contents_type;          typedef bounded_buffer<buff_contents_type> bounded_buffer_type;          typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr; -        struct buff_type{ -            bounded_buffer_sptr buff; -            elem_type popped_element; -            bool has_popped_element; -        }; -        std::vector<buff_type> _buffs; - -        //the seq identifier to align with -        seq_type _expected_seq_id; - -        //a condition to notify when a new element is pushed -        boost::condition_variable _pushed_cond; +        std::vector<bounded_buffer_sptr> _buffs; +        std::list<size_t> _all_indexes;      };  }} //namespace diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index 26fc9c0a0..c50a626fb 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -22,7 +22,7 @@  #include <boost/bind.hpp>  #include <boost/shared_ptr.hpp>  #include <boost/circular_buffer.hpp> -#include <boost/thread/condition_variable.hpp> +#include <boost/thread/condition.hpp>  namespace uhd{ namespace transport{ @@ -41,7 +41,7 @@ namespace uhd{ namespace transport{           * Create a new bounded_buffer of a given size.           * \param capacity the bounded_buffer capacity           */ -        bounded_buffer(size_t capacity) : _buffer(capacity), _size(0){ +        bounded_buffer(size_t capacity) : _buffer(capacity){              /* NOP */          } @@ -53,19 +53,23 @@ namespace uhd{ namespace transport{          }          /*! -         * Is the bounded_buffer buffer not full? -         * \return true for not full -         */ -        bool is_not_full(void) const{ -            return _size != _buffer.capacity(); -        } - -        /*! -         * Is the bounded_buffer buffer not empty? -         * \return true for not empty +         * Push a new element into the bounded buffer. +         * If the buffer is full prior to the push, +         * make room by poping the oldest element. +         * \param elem the new element to push +         * \return true if the element fit without popping for space           */ -        bool is_not_empty(void) const{ -            return _size != 0; +        UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            if(_buffer.full()){ +                _buffer.pop_back(); +                _buffer.push_front(elem); +                return false; +            } +            else{ +                _buffer.push_front(elem); +                return true; +            }          }          /*! @@ -75,8 +79,8 @@ namespace uhd{ namespace transport{           */          UHD_INLINE void push_with_wait(const elem_type &elem){              boost::unique_lock<boost::mutex> lock(_mutex); -            _full_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::is_not_full, this)); -            _buffer.push_front(elem); ++_size; +            _full_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::not_full, this)); +            _buffer.push_front(elem);              lock.unlock();              _empty_cond.notify_one();          } @@ -91,8 +95,8 @@ namespace uhd{ namespace transport{          template<typename time_type> UHD_INLINE          bool push_with_timed_wait(const elem_type &elem, const time_type &time){              boost::unique_lock<boost::mutex> lock(_mutex); -            if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::is_not_full, this))) return false; -            _buffer.push_front(elem); ++_size; +            if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::not_full, this))) return false; +            _buffer.push_front(elem);              lock.unlock();              _empty_cond.notify_one();              return true; @@ -105,8 +109,8 @@ namespace uhd{ namespace transport{           */          UHD_INLINE void pop_with_wait(elem_type &elem){              boost::unique_lock<boost::mutex> lock(_mutex); -            _empty_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::is_not_empty, this)); -            elem = _buffer[--_size]; +            _empty_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::not_empty, this)); +            elem = _buffer.back(); _buffer.pop_back();              lock.unlock();              _full_cond.notify_one();          } @@ -121,8 +125,8 @@ namespace uhd{ namespace transport{          template<typename time_type> UHD_INLINE          bool pop_with_timed_wait(elem_type &elem, const time_type &time){              boost::unique_lock<boost::mutex> lock(_mutex); -            if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::is_not_empty, this))) return false; -            elem = _buffer[--_size]; +            if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::not_empty, this))) return false; +            elem = _buffer.back(); _buffer.pop_back();              lock.unlock();              _full_cond.notify_one();              return true; @@ -130,10 +134,11 @@ namespace uhd{ namespace transport{      private:          boost::mutex _mutex; -        boost::condition_variable _empty_cond, _full_cond; +        boost::condition _empty_cond, _full_cond;          boost::circular_buffer<elem_type> _buffer; -        size_t _size; +        bool not_full(void) const{return not _buffer.full();} +        bool not_empty(void) const{return not _buffer.empty();}      };  }} //namespace diff --git a/host/test/bounded_buffer_test.cpp b/host/test/bounded_buffer_test.cpp index 5d6b6faec..dba1a4258 100644 --- a/host/test/bounded_buffer_test.cpp +++ b/host/test/bounded_buffer_test.cpp @@ -18,14 +18,11 @@  #include <boost/test/unit_test.hpp>  #include <uhd/transport/bounded_buffer.hpp> -//test #Include -#include <uhd/transport/alignment_buffer.hpp> -  using namespace uhd::transport;  static const boost::posix_time::milliseconds timeout(10); -BOOST_AUTO_TEST_CASE(test_bounded_buffer){ +BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_timed_wait){      bounded_buffer<int>::sptr bb(new bounded_buffer<int>(3));      //push elements, check for timeout @@ -44,3 +41,77 @@ BOOST_AUTO_TEST_CASE(test_bounded_buffer){      BOOST_CHECK_EQUAL(val, 2);      BOOST_CHECK(not bb->pop_with_timed_wait(val, timeout));  } + +BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_pop_on_full){ +    bounded_buffer<int>::sptr bb(new bounded_buffer<int>(3)); + +    //push elements, check for timeout +    BOOST_CHECK(bb->push_with_pop_on_full(0)); +    BOOST_CHECK(bb->push_with_pop_on_full(1)); +    BOOST_CHECK(bb->push_with_pop_on_full(2)); +    BOOST_CHECK(not bb->push_with_pop_on_full(3)); + +    int val; +    //pop elements, check for timeout and check values +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 1); +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 2); +    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK_EQUAL(val, 3); +} + +#include <uhd/transport/alignment_buffer.hpp> +#include <boost/assign/list_of.hpp> + +using namespace boost::assign; + +BOOST_AUTO_TEST_CASE(test_alignment_buffer_tmp){ +    alignment_buffer<int, size_t>::sptr ab(new alignment_buffer<int, size_t>(5, 3)); +    //load index 0 with all good seq numbers +    BOOST_CHECK(ab->push_with_pop_on_full(0, 0, 0)); +    BOOST_CHECK(ab->push_with_pop_on_full(1, 1, 0)); +    BOOST_CHECK(ab->push_with_pop_on_full(2, 2, 0)); +    BOOST_CHECK(ab->push_with_pop_on_full(3, 3, 0)); +    BOOST_CHECK(ab->push_with_pop_on_full(4, 4, 0)); + +    //load index 1 with some skipped seq numbers +    BOOST_CHECK(ab->push_with_pop_on_full(10, 0, 1)); +    BOOST_CHECK(ab->push_with_pop_on_full(11, 1, 1)); +    BOOST_CHECK(ab->push_with_pop_on_full(14, 4, 1)); +    BOOST_CHECK(ab->push_with_pop_on_full(15, 5, 1)); +    BOOST_CHECK(ab->push_with_pop_on_full(16, 6, 1)); + +    //load index 2 with all good seq numbers +    BOOST_CHECK(ab->push_with_pop_on_full(20, 0, 2)); +    BOOST_CHECK(ab->push_with_pop_on_full(21, 1, 2)); +    BOOST_CHECK(ab->push_with_pop_on_full(22, 2, 2)); +    BOOST_CHECK(ab->push_with_pop_on_full(23, 3, 2)); +    BOOST_CHECK(ab->push_with_pop_on_full(24, 4, 2)); + +    //readback aligned values +    std::vector<int> aligned_elems(3); + +    std::vector<int> expected_elems0 = list_of(0)(10)(20); +    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); +    BOOST_CHECK_EQUAL_COLLECTIONS( +        aligned_elems.begin(), aligned_elems.end(), +        expected_elems0.begin(), expected_elems0.end() +    ); + +    std::vector<int> expected_elems1 = list_of(1)(11)(21); +    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); +    BOOST_CHECK_EQUAL_COLLECTIONS( +        aligned_elems.begin(), aligned_elems.end(), +        expected_elems1.begin(), expected_elems1.end() +    ); + +    //there was a skip now find 4 + +    std::vector<int> expected_elems4 = list_of(4)(14)(24); +    BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout)); +    BOOST_CHECK_EQUAL_COLLECTIONS( +        aligned_elems.begin(), aligned_elems.end(), +        expected_elems4.begin(), expected_elems4.end() +    ); +} | 
