diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/include/uhd/transport/alignment_buffer.hpp | 22 | ||||
-rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 10 |
2 files changed, 31 insertions, 1 deletions
diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp index dc6ccc3ed..5766c2284 100644 --- a/host/include/uhd/transport/alignment_buffer.hpp +++ b/host/include/uhd/transport/alignment_buffer.hpp @@ -56,6 +56,11 @@ namespace uhd{ namespace transport{ const seq_type &seq, size_t index ){ + //clear the buffer for this index if the seqs are mis-ordered + if (seq < _last_seqs[index]){ + _buffs[index]->clear(); + _there_was_a_clear = true; + } _last_seqs[index] = seq; return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq)); } @@ -79,6 +84,18 @@ namespace uhd{ namespace transport{ //get an aligned set of elements from the buffers: while(indexes_to_do.size() != 0){ + + //respond to a clear by starting from scratch + if(_there_was_a_clear){ + _there_was_a_clear = false; + indexes_to_do = _all_indexes; + index = indexes_to_do.front(); + if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false; + elems[index] = buff_contents_tmp.first; + expected_seq_id = buff_contents_tmp.second; + indexes_to_do.pop_front(); + } + //pop an element off for this index index = indexes_to_do.front(); if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false; @@ -118,14 +135,17 @@ namespace uhd{ namespace transport{ typedef bounded_buffer<buff_contents_type> bounded_buffer_type; typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr; std::vector<bounded_buffer_sptr> _buffs; + std::vector<seq_type> _last_seqs; std::list<size_t> _all_indexes; + bool _there_was_a_clear; //private constructor - alignment_buffer(size_t capacity, size_t width){ + alignment_buffer(size_t capacity, size_t width) : _last_seqs(width){ for (size_t i = 0; i < width; i++){ _buffs.push_back(bounded_buffer_type::make(capacity)); _all_indexes.push_back(i); } + _there_was_a_clear = false; } }; diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index baecd6382..94c360fba 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -129,6 +129,16 @@ namespace uhd{ namespace transport{ return true; } + /*! + * Clear all elements from the bounded_buffer. + */ + UHD_INLINE void clear(void){ + boost::unique_lock<boost::mutex> lock(_mutex); + while (not_empty()) _buffer.pop_back(); + lock.unlock(); + _full_cond.notify_one(); + } + private: boost::mutex _mutex; boost::condition _empty_cond, _full_cond; |