summaryrefslogtreecommitdiffstats
path: root/host/include
diff options
context:
space:
mode:
Diffstat (limited to 'host/include')
-rw-r--r--host/include/uhd/transport/alignment_buffer.hpp97
-rw-r--r--host/include/uhd/transport/bounded_buffer.hpp53
2 files changed, 78 insertions, 72 deletions
diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp
index 7fa4f2694..b33b80da9 100644
--- a/host/include/uhd/transport/alignment_buffer.hpp
+++ b/host/include/uhd/transport/alignment_buffer.hpp
@@ -41,10 +41,9 @@ namespace uhd{ namespace transport{
* \param width the number of elements to align
*/
alignment_buffer(size_t capacity, size_t width){
- _buffs.resize(width);
for (size_t i = 0; i < width; i++){
- _buffs[i].buff = bounded_buffer_sptr(new bounded_buffer_type(capacity));
- _buffs[i].has_popped_element = false;
+ _buffs.push_back(bounded_buffer_sptr(new bounded_buffer_type(capacity)));
+ _all_indexes.push_back(i);
}
}
@@ -56,60 +55,72 @@ namespace uhd{ namespace transport{
}
/*!
- * Push a single element into the buffer specified by index.
- * Notify the condition variable for a thread blocked in pop.
+ * Push an element with sequence id into the buffer at index.
* \param elem the element to push
* \param seq the sequence identifier
* \param index the buffer index
+ * \return true if the element fit without popping for space
*/
- void push_elem_with_wait(const elem_type &elem, const seq_type &seq, size_t index){
- _buffs[index].buff.push_with_wait(buff_contents_type(elem, seq));
- _pushed_cond.notify_one();
+ UHD_INLINE bool push_with_pop_on_full(
+ const elem_type &elem,
+ const seq_type &seq,
+ size_t index
+ ){
+ return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq));
}
/*!
* Pop an aligned set of elements from this alignment buffer.
* \param elems a collection to store the aligned elements
+ * \param time the timeout time
+ * \return false when the operation times out
*/
- template <typename elems_type>
- void pop_elems_with_wait(elems_type &elems){
- //TODO................................
+ template <typename elems_type, typename time_type>
+ bool pop_elems_with_timed_wait(elems_type &elems, const time_type &time){
buff_contents_type buff_contents_tmp;
- for (size_t i = 0; i < _buffs.size();){
- if (_buffs[i].has_popped_element){
- i++:
- continue;
+ std::list<size_t> indexes_to_do(_all_indexes);
+
+ //the seq identifier to align with
+ seq_type expected_seq_id = seq_type();
+ bool expected_seq_id_valid = false;
+
+ //get an aligned set of elements from the buffers:
+ while(indexes_to_do.size() != 0){
+ size_t index = indexes_to_do.back();
+
+ //pop an element off for this index
+ if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false;
+
+ //grab the current sequence id if not valid
+ if (not expected_seq_id_valid){
+ expected_seq_id_valid = true;
+ expected_seq_id = buff_contents_tmp.second;
}
- _buffs[i].pop_with_wait(buff_contents_tmp);
- if (buff_contents_tmp.second == _expected_seq_id){
- _buffs[i].has_popped_element = true;
- i++;
+
+ //if the sequence id matches:
+ // store the popped element into the output,
+ // remove this index from the list and continue
+ if (buff_contents_tmp.second == expected_seq_id){
+ elems[index] = buff_contents_tmp.first;
+ indexes_to_do.pop_back();
continue;
}
- //if the sequence number is older, pop until we get the current sequence number
- //do this by setting has popped element false and continuing on the same condition
- if (buff_contents_tmp.second < _expected_seq_id){
- _buffs[i].has_popped_element = false;
+ //if the sequence id is older:
+ // continue with the same index to try again
+ if (buff_contents_tmp.second < expected_seq_id){
continue;
}
- //if the sequence number is newer, start from scratch at the new sequence number
- //do this by setting all has popped elements false and restarting on index zero
- if (buff_contents_tmp.second > _expected_seq_id){
- _expected_seq_id = buff_contents_tmp.second;
- for (size_t j = 0; j < i; j++){
- _buffs[j].has_popped_element = false;
- }
- i = 0;
+ //if the sequence id is newer:
+ // start from scratch at the new sequence number
+ if (buff_contents_tmp.second > expected_seq_id){
+ expected_seq_id = buff_contents_tmp.second;
+ indexes_to_do = _all_indexes;
continue;
}
}
- //if aligned
- for (size_t i = 0; i < _buffs.size(); i++){
- elems[i] = _buffs[i].popped_element;
- _buffs[i].has_popped_element = false;
- }
+ return true;
}
private:
@@ -117,18 +128,8 @@ namespace uhd{ namespace transport{
typedef std::pair<elem_type, seq_type> buff_contents_type;
typedef bounded_buffer<buff_contents_type> bounded_buffer_type;
typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr;
- struct buff_type{
- bounded_buffer_sptr buff;
- elem_type popped_element;
- bool has_popped_element;
- };
- std::vector<buff_type> _buffs;
-
- //the seq identifier to align with
- seq_type _expected_seq_id;
-
- //a condition to notify when a new element is pushed
- boost::condition_variable _pushed_cond;
+ std::vector<bounded_buffer_sptr> _buffs;
+ std::list<size_t> _all_indexes;
};
}} //namespace
diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp
index 26fc9c0a0..c50a626fb 100644
--- a/host/include/uhd/transport/bounded_buffer.hpp
+++ b/host/include/uhd/transport/bounded_buffer.hpp
@@ -22,7 +22,7 @@
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/circular_buffer.hpp>
-#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/condition.hpp>
namespace uhd{ namespace transport{
@@ -41,7 +41,7 @@ namespace uhd{ namespace transport{
* Create a new bounded_buffer of a given size.
* \param capacity the bounded_buffer capacity
*/
- bounded_buffer(size_t capacity) : _buffer(capacity), _size(0){
+ bounded_buffer(size_t capacity) : _buffer(capacity){
/* NOP */
}
@@ -53,19 +53,23 @@ namespace uhd{ namespace transport{
}
/*!
- * Is the bounded_buffer buffer not full?
- * \return true for not full
- */
- bool is_not_full(void) const{
- return _size != _buffer.capacity();
- }
-
- /*!
- * Is the bounded_buffer buffer not empty?
- * \return true for not empty
+ * Push a new element into the bounded buffer.
+ * If the buffer is full prior to the push,
+ * make room by poping the oldest element.
+ * \param elem the new element to push
+ * \return true if the element fit without popping for space
*/
- bool is_not_empty(void) const{
- return _size != 0;
+ UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if(_buffer.full()){
+ _buffer.pop_back();
+ _buffer.push_front(elem);
+ return false;
+ }
+ else{
+ _buffer.push_front(elem);
+ return true;
+ }
}
/*!
@@ -75,8 +79,8 @@ namespace uhd{ namespace transport{
*/
UHD_INLINE void push_with_wait(const elem_type &elem){
boost::unique_lock<boost::mutex> lock(_mutex);
- _full_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::is_not_full, this));
- _buffer.push_front(elem); ++_size;
+ _full_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::not_full, this));
+ _buffer.push_front(elem);
lock.unlock();
_empty_cond.notify_one();
}
@@ -91,8 +95,8 @@ namespace uhd{ namespace transport{
template<typename time_type> UHD_INLINE
bool push_with_timed_wait(const elem_type &elem, const time_type &time){
boost::unique_lock<boost::mutex> lock(_mutex);
- if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::is_not_full, this))) return false;
- _buffer.push_front(elem); ++_size;
+ if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::not_full, this))) return false;
+ _buffer.push_front(elem);
lock.unlock();
_empty_cond.notify_one();
return true;
@@ -105,8 +109,8 @@ namespace uhd{ namespace transport{
*/
UHD_INLINE void pop_with_wait(elem_type &elem){
boost::unique_lock<boost::mutex> lock(_mutex);
- _empty_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::is_not_empty, this));
- elem = _buffer[--_size];
+ _empty_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::not_empty, this));
+ elem = _buffer.back(); _buffer.pop_back();
lock.unlock();
_full_cond.notify_one();
}
@@ -121,8 +125,8 @@ namespace uhd{ namespace transport{
template<typename time_type> UHD_INLINE
bool pop_with_timed_wait(elem_type &elem, const time_type &time){
boost::unique_lock<boost::mutex> lock(_mutex);
- if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::is_not_empty, this))) return false;
- elem = _buffer[--_size];
+ if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::not_empty, this))) return false;
+ elem = _buffer.back(); _buffer.pop_back();
lock.unlock();
_full_cond.notify_one();
return true;
@@ -130,10 +134,11 @@ namespace uhd{ namespace transport{
private:
boost::mutex _mutex;
- boost::condition_variable _empty_cond, _full_cond;
+ boost::condition _empty_cond, _full_cond;
boost::circular_buffer<elem_type> _buffer;
- size_t _size;
+ bool not_full(void) const{return not _buffer.full();}
+ bool not_empty(void) const{return not _buffer.empty();}
};
}} //namespace