aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/include/uhd/transport/CMakeLists.txt2
-rw-r--r--host/include/uhd/transport/alignment_buffer.hpp69
-rw-r--r--host/include/uhd/transport/alignment_buffer.ipp144
-rw-r--r--host/include/uhd/transport/bounded_buffer.ipp13
-rw-r--r--host/lib/usrp/usrp2/io_impl.cpp187
-rw-r--r--host/test/buffer_test.cpp51
6 files changed, 150 insertions, 316 deletions
diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt
index 2c84c0724..ec3b7b113 100644
--- a/host/include/uhd/transport/CMakeLists.txt
+++ b/host/include/uhd/transport/CMakeLists.txt
@@ -17,8 +17,6 @@
INSTALL(FILES
- alignment_buffer.hpp
- alignment_buffer.ipp
bounded_buffer.hpp
bounded_buffer.ipp
convert_types.hpp
diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp
deleted file mode 100644
index f44a037f8..000000000
--- a/host/include/uhd/transport/alignment_buffer.hpp
+++ /dev/null
@@ -1,69 +0,0 @@
-//
-// Copyright 2010 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP
-#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP
-
-#include <uhd/config.hpp>
-#include <uhd/transport/bounded_buffer.hpp> //time_duration_t
-#include <boost/shared_ptr.hpp>
-#include <vector>
-
-namespace uhd{ namespace transport{
-
- /*!
- * Implement a templated alignment buffer:
- * Used for aligning asynchronously pushed elements with matching ids.
- */
- template <typename elem_type, typename seq_type> class alignment_buffer{
- public:
- typedef boost::shared_ptr<alignment_buffer<elem_type, seq_type> > sptr;
-
- /*!
- * Make a new alignment buffer object.
- * \param capacity the maximum elements per index
- * \param width the number of elements to align
- */
- static sptr make(size_t capacity, size_t width);
-
- /*!
- * Push an element with sequence id into the buffer at index.
- * \param elem the element to push
- * \param seq the sequence identifier
- * \param index the buffer index
- * \return true if the element fit without popping for space
- */
- virtual bool push_with_pop_on_full(
- const elem_type &elem, const seq_type &seq, size_t index
- ) = 0;
-
- /*!
- * Pop an aligned set of elements from this alignment buffer.
- * \param elems a collection to store the aligned elements
- * \param timeout the timeout in seconds
- * \return false when the operation times out
- */
- virtual bool pop_elems_with_timed_wait(
- std::vector<elem_type> &elems, double timeout
- ) = 0;
- };
-
-}} //namespace
-
-#include <uhd/transport/alignment_buffer.ipp>
-
-#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP */
diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp
deleted file mode 100644
index 833b5d399..000000000
--- a/host/include/uhd/transport/alignment_buffer.ipp
+++ /dev/null
@@ -1,144 +0,0 @@
-//
-// Copyright 2010 Ettus Research LLC
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-//
-
-#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP
-#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP
-
-#include <uhd/transport/bounded_buffer.hpp>
-#include <boost/thread/condition_variable.hpp>
-#include <utility>
-
-namespace uhd{ namespace transport{ namespace{ /*anon*/
-
- /*!
- * Imlement a templated alignment buffer:
- * Used for aligning asynchronously pushed elements with matching ids.
- */
- template <typename elem_type, typename seq_type>
- class alignment_buffer_impl : public alignment_buffer<elem_type, seq_type>{
- public:
-
- alignment_buffer_impl(size_t capacity, size_t width) : _last_seqs(width){
- for (size_t i = 0; i < width; i++){
- _buffs.push_back(bounded_buffer<buff_contents_type>::make(capacity));
- _all_indexes.push_back(i);
- }
- _there_was_a_clear = false;
- }
-
- UHD_INLINE bool push_with_pop_on_full(
- const elem_type &elem, const seq_type &seq, size_t index
- ){
- //clear the buffer for this index if the seqs are mis-ordered
- if (seq < _last_seqs[index]){
- _buffs[index]->clear();
- _there_was_a_clear = true;
- } _last_seqs[index] = seq;
- return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq));
- }
-
- UHD_INLINE bool pop_elems_with_timed_wait(
- std::vector<elem_type> &elems, double timeout
- ){
- boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout);
- buff_contents_type buff_contents_tmp;
- std::list<size_t> indexes_to_do(_all_indexes);
-
- //do an initial pop to load an initial sequence id
- size_t index = indexes_to_do.front();
- if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
- )) return false;
- elems[index] = buff_contents_tmp.first;
- seq_type expected_seq_id = buff_contents_tmp.second;
- indexes_to_do.pop_front();
-
- //get an aligned set of elements from the buffers:
- while(indexes_to_do.size() != 0){
-
- //respond to a clear by starting from scratch
- if(_there_was_a_clear){
- _there_was_a_clear = false;
- indexes_to_do = _all_indexes;
- index = indexes_to_do.front();
- if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
- )) return false;
- elems[index] = buff_contents_tmp.first;
- expected_seq_id = buff_contents_tmp.second;
- indexes_to_do.pop_front();
- }
-
- //pop an element off for this index
- index = indexes_to_do.front();
- if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
- )) return false;
-
- //if the sequence id matches:
- // store the popped element into the output,
- // remove this index from the list and continue
- if (buff_contents_tmp.second == expected_seq_id){
- elems[index] = buff_contents_tmp.first;
- indexes_to_do.pop_front();
- continue;
- }
-
- //if the sequence id is older:
- // continue with the same index to try again
- if (buff_contents_tmp.second < expected_seq_id){
- continue;
- }
-
- //if the sequence id is newer:
- // store the popped element into the output,
- // add all other indexes back into the list
- if (buff_contents_tmp.second > expected_seq_id){
- elems[index] = buff_contents_tmp.first;
- expected_seq_id = buff_contents_tmp.second;
- indexes_to_do = _all_indexes;
- indexes_to_do.remove(index);
- continue;
- }
- }
- return true;
- }
-
- private:
- //a vector of bounded buffers for each index
- typedef std::pair<elem_type, seq_type> buff_contents_type;
- std::vector<typename bounded_buffer<buff_contents_type>::sptr> _buffs;
- std::vector<seq_type> _last_seqs;
- std::list<size_t> _all_indexes;
- bool _there_was_a_clear;
- };
-
-}}} //namespace
-
-namespace uhd{ namespace transport{
-
- template <typename elem_type, typename seq_type>
- typename alignment_buffer<elem_type, seq_type>::sptr
- alignment_buffer<elem_type, seq_type>::make(size_t capacity, size_t width){
- return typename alignment_buffer<elem_type, seq_type>::sptr(
- new alignment_buffer_impl<elem_type, seq_type>(capacity, width)
- );
- }
-
-}} //namespace
-
-#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_IPP */
diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp
index edc7faa06..f7915d866 100644
--- a/host/include/uhd/transport/bounded_buffer.ipp
+++ b/host/include/uhd/transport/bounded_buffer.ipp
@@ -26,14 +26,6 @@
namespace uhd{ namespace transport{ namespace{ /*anon*/
- static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
- return boost::posix_time::microseconds(long(timeout*1e6));
- }
-
- static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){
- return 1e-6*time_dur.total_microseconds();
- }
-
template <typename elem_type>
class bounded_buffer_impl : public bounded_buffer<elem_type>{
public:
@@ -127,6 +119,11 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
_buffer.pop_back();
return elem;
}
+
+ static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
+ return boost::posix_time::microseconds(long(timeout*1e6));
+ }
+
};
}}} //namespace
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index cbc0a0817..c8e4b7096 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -21,11 +21,13 @@
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/transport/convert_types.hpp>
-#include <uhd/transport/alignment_buffer.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <iostream>
+#include <list>
using namespace uhd;
using namespace uhd::usrp;
@@ -108,16 +110,24 @@ private:
* - vrt packet handler states
**********************************************************************/
struct usrp2_impl::io_impl{
- typedef alignment_buffer<managed_recv_buffer::sptr, time_spec_t> alignment_buffer_type;
- io_impl(size_t num_recv_frames, size_t send_frame_size, size_t width):
+ io_impl(size_t send_frame_size, size_t width):
packet_handler_recv_state(width),
- recv_pirate_booty(alignment_buffer_type::make(num_recv_frames-3, width)),
async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/))
{
- for (size_t i = 0; i < width; i++) fc_mons.push_back(
- flow_control_monitor::sptr(new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size))
- );
+ for (size_t i = 0; i < width; i++){
+ fc_mons.push_back(flow_control_monitor::sptr(
+ new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size)
+ ));
+ //init empty packet infos
+ vrt::if_packet_info_t packet_info;
+ packet_info.packet_count = 0;
+ packet_info.has_tsi = true;
+ packet_info.tsi = 0;
+ packet_info.has_tsf = true;
+ packet_info.tsf = 0;
+ prev_infos.push_back(packet_info);
+ }
}
~io_impl(void){
@@ -126,11 +136,6 @@ struct usrp2_impl::io_impl{
recv_pirate_crew.join_all();
}
- bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout);
- }
-
bool get_send_buffs(
const std::vector<zero_copy_if::sptr> &trans,
vrt_packet_handler::managed_send_buffs_t &buffs,
@@ -151,6 +156,15 @@ struct usrp2_impl::io_impl{
return true;
}
+ bool get_recv_buffs(
+ const std::vector<zero_copy_if::sptr> xports,
+ vrt_packet_handler::managed_recv_buffs_t &buffs,
+ double timeout
+ );
+
+ //previous state for each buffer
+ std::vector<vrt::if_packet_info_t> prev_infos;
+
//flow control monitors
std::vector<flow_control_monitor::sptr> fc_mons;
@@ -162,29 +176,28 @@ struct usrp2_impl::io_impl{
void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t);
boost::thread_group recv_pirate_crew;
bool recv_pirate_crew_raiding;
- alignment_buffer_type::sptr recv_pirate_booty;
bounded_buffer<async_metadata_t>::sptr async_msg_fifo;
boost::mutex spawn_mutex;
};
/***********************************************************************
* Receive Pirate Loop
- * - while raiding, loot for recv buffers
- * - put booty into the alignment buffer
+ * - while raiding, loot for message packet
+ * - update flow control condition count
+ * - put async message packets into queue
**********************************************************************/
void usrp2_impl::io_impl::recv_pirate_loop(
- zero_copy_if::sptr zc_if,
+ zero_copy_if::sptr zc_if_err0,
usrp2_mboard_impl::sptr mboard,
size_t index
){
set_thread_priority_safe();
recv_pirate_crew_raiding = true;
- size_t next_packet_seq = 0;
spawn_mutex.unlock();
while(recv_pirate_crew_raiding){
- managed_recv_buffer::sptr buff = zc_if->get_recv_buff();
+ managed_recv_buffer::sptr buff = zc_if_err0->get_recv_buff();
if (not buff.get()) continue; //ignore timeout/error buffers
try{
@@ -194,26 +207,6 @@ void usrp2_impl::io_impl::recv_pirate_loop(
const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>();
vrt::if_hdr_unpack_be(vrt_hdr, if_packet_info);
- //handle the rx data stream
- if (if_packet_info.sid == usrp2_impl::RECV_SID){
- //handle the packet count / sequence number
- if (if_packet_info.packet_count != next_packet_seq){
- //std::cerr << "S" << (if_packet_info.packet_count - next_packet_seq)%16;
- std::cerr << "O" << std::flush; //report overflow (drops in the kernel)
- }
- next_packet_seq = (if_packet_info.packet_count+1)%16;
-
- //extract the timespec and round to the nearest packet
- UHD_ASSERT_THROW(if_packet_info.has_tsi and if_packet_info.has_tsf);
- time_spec_t time(
- time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq()
- );
-
- //push the packet into the buffer with the new time
- recv_pirate_booty->push_with_pop_on_full(buff, time, index);
- continue;
- }
-
//handle a tx async report message
if (if_packet_info.sid == usrp2_impl::ASYNC_SID and if_packet_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
@@ -253,11 +246,10 @@ void usrp2_impl::io_impl::recv_pirate_loop(
void usrp2_impl::io_init(void){
//the assumption is that all data transports should be identical
- const size_t num_recv_frames = _data_transports.front()->get_num_recv_frames();
const size_t send_frame_size = _data_transports.front()->get_send_frame_size();
//create new io impl
- _io_impl = UHD_PIMPL_MAKE(io_impl, (num_recv_frames, send_frame_size, _data_transports.size()));
+ _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports.size()));
//TODO temporary fix for weird power up state, remove when FPGA fixed
{
@@ -276,7 +268,7 @@ void usrp2_impl::io_init(void){
//spawn a new pirate to plunder the recv booty
_io_impl->recv_pirate_crew.create_thread(boost::bind(
&usrp2_impl::io_impl::recv_pirate_loop,
- _io_impl.get(), _data_transports.at(i),
+ _io_impl.get(), _err0_transports.at(i),
_mboards.at(i), i
));
//block here until the spawned thread unlocks
@@ -328,6 +320,117 @@ size_t usrp2_impl::send(
}
/***********************************************************************
+ * Alignment logic on receive
+ **********************************************************************/
+static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
+ return boost::posix_time::microseconds(long(timeout*1e6));
+}
+
+static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){
+ return 1e-6*time_dur.total_microseconds();
+}
+
+static UHD_INLINE time_spec_t extract_time_spec(const vrt::if_packet_info_t &packet_info){
+ return time_spec_t( //assumes has_tsi and has_tsf are true
+ time_t(packet_info.tsi), size_t(packet_info.tsf),
+ 100e6 //tick rate does not have to be correct for comparison purposes
+ );
+}
+
+static UHD_INLINE void extract_packet_info(
+ managed_recv_buffer::sptr buff,
+ vrt::if_packet_info_t &prev_info,
+ time_spec_t &time, bool &clear
+){
+ //extract packet info
+ vrt::if_packet_info_t next_info;
+ vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), next_info);
+
+ //handle the packet count / sequence number
+ if ((prev_info.packet_count+1)%16 != next_info.packet_count){
+ std::cerr << "O" << std::flush; //report overflow (drops in the kernel)
+ }
+
+ time = extract_time_spec(next_info);
+ clear = extract_time_spec(prev_info) > time;
+ prev_info = next_info;
+}
+
+UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs(
+ const std::vector<zero_copy_if::sptr> xports,
+ vrt_packet_handler::managed_recv_buffs_t &buffs,
+ double timeout
+){
+ if (buffs.size() == 1){
+ buffs[0] = xports[0]->get_recv_buff(timeout);
+ if (buffs[0].get() == NULL) return false;
+ bool clear; time_spec_t time; //unused variables
+ //call extract_packet_info to handle printing the overflows
+ extract_packet_info(buffs[0], this->prev_infos[0], time, clear);
+ return true;
+ }
+ //-------------------- begin alignment logic ---------------------//
+ boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout);
+ managed_recv_buffer::sptr buff_tmp;
+ std::list<size_t> _all_indexes, indexes_to_do;
+ for (size_t i = 0; i < buffs.size(); i++) _all_indexes.push_back(i);
+ bool clear;
+ time_spec_t expected_time;
+
+ //respond to a clear by starting from scratch
+ got_clear:
+ indexes_to_do = _all_indexes;
+ clear = false;
+
+ //do an initial pop to load an initial sequence id
+ size_t index = indexes_to_do.front();
+ buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time()));
+ if (buff_tmp.get() == NULL) return false;
+ extract_packet_info(buff_tmp, this->prev_infos[index], expected_time, clear);
+ if (clear) goto got_clear;
+ buffs[index] = buff_tmp;
+ indexes_to_do.pop_front();
+
+ //get an aligned set of elements from the buffers:
+ while(indexes_to_do.size() != 0){
+
+ //pop an element off for this index
+ index = indexes_to_do.front();
+ buff_tmp = xports[index]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time()));
+ if (buff_tmp.get() == NULL) return false;
+ time_spec_t this_time;
+ extract_packet_info(buff_tmp, this->prev_infos[index], this_time, clear);
+ if (clear) goto got_clear;
+ buffs[index] = buff_tmp;
+
+ //if the sequence id matches:
+ // remove this index from the list and continue
+ if (this_time == expected_time){
+ indexes_to_do.pop_front();
+ continue;
+ }
+
+ //if the sequence id is older:
+ // continue with the same index to try again
+ else if (this_time < expected_time){
+ continue;
+ }
+
+ //if the sequence id is newer:
+ // use the new expected time for comparison
+ // add all other indexes back into the list
+ else{
+ expected_time = this_time;
+ indexes_to_do = _all_indexes;
+ indexes_to_do.remove(index);
+ continue;
+ }
+ }
+ return true;
+ //-------------------- end alignment logic -----------------------//
+}
+
+/***********************************************************************
* Receive Data
**********************************************************************/
size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{
@@ -357,7 +460,7 @@ size_t usrp2_impl::recv(
io_type, _rx_otw_type, //input and output types to convert
_mboards.front()->get_master_clock_freq(), //master clock tick rate
uhd::transport::vrt::if_hdr_unpack_be,
- boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout),
+ boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _data_transports, _1, timeout),
boost::bind(&handle_overflow, _mboards, _1)
);
}
diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp
index 8445412e7..e7bc88699 100644
--- a/host/test/buffer_test.cpp
+++ b/host/test/buffer_test.cpp
@@ -17,7 +17,6 @@
#include <boost/test/unit_test.hpp>
#include <uhd/transport/bounded_buffer.hpp>
-#include <uhd/transport/alignment_buffer.hpp>
#include <boost/assign/list_of.hpp>
using namespace boost::assign;
@@ -63,53 +62,3 @@ BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_pop_on_full){
BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
BOOST_CHECK_EQUAL(val, 3);
}
-
-BOOST_AUTO_TEST_CASE(test_alignment_buffer){
- alignment_buffer<int, size_t>::sptr ab(alignment_buffer<int, size_t>::make(7, 3));
- //load index 0 with all good seq numbers
- BOOST_CHECK(ab->push_with_pop_on_full(0, 0, 0));
- BOOST_CHECK(ab->push_with_pop_on_full(1, 1, 0));
- BOOST_CHECK(ab->push_with_pop_on_full(2, 2, 0));
- BOOST_CHECK(ab->push_with_pop_on_full(3, 3, 0));
- BOOST_CHECK(ab->push_with_pop_on_full(4, 4, 0));
-
- //load index 1 with some skipped seq numbers
- BOOST_CHECK(ab->push_with_pop_on_full(10, 0, 1));
- BOOST_CHECK(ab->push_with_pop_on_full(11, 1, 1));
- BOOST_CHECK(ab->push_with_pop_on_full(14, 4, 1));
- BOOST_CHECK(ab->push_with_pop_on_full(15, 5, 1));
- BOOST_CHECK(ab->push_with_pop_on_full(16, 6, 1));
-
- //load index 2 with all good seq numbers
- BOOST_CHECK(ab->push_with_pop_on_full(20, 0, 2));
- BOOST_CHECK(ab->push_with_pop_on_full(21, 1, 2));
- BOOST_CHECK(ab->push_with_pop_on_full(22, 2, 2));
- BOOST_CHECK(ab->push_with_pop_on_full(23, 3, 2));
- BOOST_CHECK(ab->push_with_pop_on_full(24, 4, 2));
-
- //readback aligned values
- std::vector<int> aligned_elems(3);
-
- static const std::vector<int> expected_elems0 = list_of(0)(10)(20);
- BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
- BOOST_CHECK_EQUAL_COLLECTIONS(
- aligned_elems.begin(), aligned_elems.end(),
- expected_elems0.begin(), expected_elems0.end()
- );
-
- static const std::vector<int> expected_elems1 = list_of(1)(11)(21);
- BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
- BOOST_CHECK_EQUAL_COLLECTIONS(
- aligned_elems.begin(), aligned_elems.end(),
- expected_elems1.begin(), expected_elems1.end()
- );
-
- //there was a skip now find 4
-
- static const std::vector<int> expected_elems4 = list_of(4)(14)(24);
- BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
- BOOST_CHECK_EQUAL_COLLECTIONS(
- aligned_elems.begin(), aligned_elems.end(),
- expected_elems4.begin(), expected_elems4.end()
- );
-}