aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/include/uhd/transport/alignment_buffer.hpp22
-rw-r--r--host/include/uhd/transport/bounded_buffer.hpp10
2 files changed, 31 insertions, 1 deletions
diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp
index dc6ccc3ed..5766c2284 100644
--- a/host/include/uhd/transport/alignment_buffer.hpp
+++ b/host/include/uhd/transport/alignment_buffer.hpp
@@ -56,6 +56,11 @@ namespace uhd{ namespace transport{
const seq_type &seq,
size_t index
){
+ //clear the buffer for this index if the seqs are mis-ordered
+ if (seq < _last_seqs[index]){
+ _buffs[index]->clear();
+ _there_was_a_clear = true;
+ } _last_seqs[index] = seq;
return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq));
}
@@ -79,6 +84,18 @@ namespace uhd{ namespace transport{
//get an aligned set of elements from the buffers:
while(indexes_to_do.size() != 0){
+
+ //respond to a clear by starting from scratch
+ if(_there_was_a_clear){
+ _there_was_a_clear = false;
+ indexes_to_do = _all_indexes;
+ index = indexes_to_do.front();
+ if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false;
+ elems[index] = buff_contents_tmp.first;
+ expected_seq_id = buff_contents_tmp.second;
+ indexes_to_do.pop_front();
+ }
+
//pop an element off for this index
index = indexes_to_do.front();
if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false;
@@ -118,14 +135,17 @@ namespace uhd{ namespace transport{
typedef bounded_buffer<buff_contents_type> bounded_buffer_type;
typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr;
std::vector<bounded_buffer_sptr> _buffs;
+ std::vector<seq_type> _last_seqs;
std::list<size_t> _all_indexes;
+ bool _there_was_a_clear;
//private constructor
- alignment_buffer(size_t capacity, size_t width){
+ alignment_buffer(size_t capacity, size_t width) : _last_seqs(width){
for (size_t i = 0; i < width; i++){
_buffs.push_back(bounded_buffer_type::make(capacity));
_all_indexes.push_back(i);
}
+ _there_was_a_clear = false;
}
};
diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp
index baecd6382..94c360fba 100644
--- a/host/include/uhd/transport/bounded_buffer.hpp
+++ b/host/include/uhd/transport/bounded_buffer.hpp
@@ -129,6 +129,16 @@ namespace uhd{ namespace transport{
return true;
}
+ /*!
+ * Clear all elements from the bounded_buffer.
+ */
+ UHD_INLINE void clear(void){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ while (not_empty()) _buffer.pop_back();
+ lock.unlock();
+ _full_cond.notify_one();
+ }
+
private:
boost::mutex _mutex;
boost::condition _empty_cond, _full_cond;