diff options
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.hpp | 22 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 10 | 
2 files changed, 31 insertions, 1 deletions
| diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp index dc6ccc3ed..5766c2284 100644 --- a/host/include/uhd/transport/alignment_buffer.hpp +++ b/host/include/uhd/transport/alignment_buffer.hpp @@ -56,6 +56,11 @@ namespace uhd{ namespace transport{              const seq_type &seq,              size_t index          ){ +            //clear the buffer for this index if the seqs are mis-ordered +            if (seq < _last_seqs[index]){ +                _buffs[index]->clear(); +                _there_was_a_clear = true; +            } _last_seqs[index] = seq;              return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq));          } @@ -79,6 +84,18 @@ namespace uhd{ namespace transport{              //get an aligned set of elements from the buffers:              while(indexes_to_do.size() != 0){ + +                //respond to a clear by starting from scratch +                if(_there_was_a_clear){ +                    _there_was_a_clear = false; +                    indexes_to_do = _all_indexes; +                    index = indexes_to_do.front(); +                    if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false; +                    elems[index] = buff_contents_tmp.first; +                    expected_seq_id = buff_contents_tmp.second; +                    indexes_to_do.pop_front(); +                } +                  //pop an element off for this index                  index = indexes_to_do.front();                  if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false; @@ -118,14 +135,17 @@ namespace uhd{ namespace transport{          typedef bounded_buffer<buff_contents_type> bounded_buffer_type;          typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr;          std::vector<bounded_buffer_sptr> _buffs; +        std::vector<seq_type> _last_seqs;          std::list<size_t> _all_indexes; +        bool _there_was_a_clear;          //private constructor -        alignment_buffer(size_t capacity, size_t width){ +        alignment_buffer(size_t capacity, size_t width) : _last_seqs(width){              for (size_t i = 0; i < width; i++){                  _buffs.push_back(bounded_buffer_type::make(capacity));                  _all_indexes.push_back(i);              } +            _there_was_a_clear = false;          }      }; diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index baecd6382..94c360fba 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -129,6 +129,16 @@ namespace uhd{ namespace transport{              return true;          } +        /*! +         * Clear all elements from the bounded_buffer. +         */ +        UHD_INLINE void clear(void){ +            boost::unique_lock<boost::mutex> lock(_mutex); +            while (not_empty()) _buffer.pop_back(); +            lock.unlock(); +            _full_cond.notify_one(); +        } +      private:          boost::mutex _mutex;          boost::condition _empty_cond, _full_cond; | 
