aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/include/uhd/transport/CMakeLists.txt2
-rw-r--r--host/include/uhd/transport/alignment_buffer.hpp134
-rw-r--r--host/include/uhd/transport/bounded_buffer.hpp146
-rw-r--r--host/include/uhd/transport/zero_copy.hpp19
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp88
-rw-r--r--host/lib/transport/vrt_packet_handler.hpp27
-rw-r--r--host/lib/usrp/usrp2/io_impl.cpp77
-rw-r--r--host/lib/usrp/usrp2/usrp2_impl.cpp3
-rw-r--r--host/lib/usrp/usrp2/usrp2_impl.hpp5
-rw-r--r--host/test/CMakeLists.txt1
-rw-r--r--host/test/buffer_test.cpp115
11 files changed, 559 insertions, 58 deletions
diff --git a/host/include/uhd/transport/CMakeLists.txt b/host/include/uhd/transport/CMakeLists.txt
index 4cefffa24..23a4aae94 100644
--- a/host/include/uhd/transport/CMakeLists.txt
+++ b/host/include/uhd/transport/CMakeLists.txt
@@ -17,6 +17,8 @@
INSTALL(FILES
+ alignment_buffer.hpp
+ bounded_buffer.hpp
convert_types.hpp
if_addrs.hpp
udp_simple.hpp
diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp
new file mode 100644
index 000000000..dc6ccc3ed
--- /dev/null
+++ b/host/include/uhd/transport/alignment_buffer.hpp
@@ -0,0 +1,134 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP
+#define INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/shared_ptr.hpp>
+#include <utility>
+#include <vector>
+
+namespace uhd{ namespace transport{
+
+ /*!
+ * Imlement a templated alignment buffer:
+ * Used for aligning asynchronously pushed elements with matching ids.
+ */
+ template <typename elem_type, typename seq_type> class alignment_buffer{
+ public:
+ typedef boost::shared_ptr<alignment_buffer<elem_type, seq_type> > sptr;
+
+ /*!
+ * Make a new alignment buffer object.
+ * \param capacity the maximum elements per index
+ * \param width the number of elements to align
+ */
+ static sptr make(size_t capacity, size_t width){
+ return sptr(new alignment_buffer(capacity, width));
+ }
+
+ /*!
+ * Push an element with sequence id into the buffer at index.
+ * \param elem the element to push
+ * \param seq the sequence identifier
+ * \param index the buffer index
+ * \return true if the element fit without popping for space
+ */
+ UHD_INLINE bool push_with_pop_on_full(
+ const elem_type &elem,
+ const seq_type &seq,
+ size_t index
+ ){
+ return _buffs[index]->push_with_pop_on_full(buff_contents_type(elem, seq));
+ }
+
+ /*!
+ * Pop an aligned set of elements from this alignment buffer.
+ * \param elems a collection to store the aligned elements
+ * \param time the timeout time
+ * \return false when the operation times out
+ */
+ template <typename elems_type, typename time_type>
+ bool pop_elems_with_timed_wait(elems_type &elems, const time_type &time){
+ buff_contents_type buff_contents_tmp;
+ std::list<size_t> indexes_to_do(_all_indexes);
+
+ //do an initial pop to load an initial sequence id
+ size_t index = indexes_to_do.front();
+ if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false;
+ elems[index] = buff_contents_tmp.first;
+ seq_type expected_seq_id = buff_contents_tmp.second;
+ indexes_to_do.pop_front();
+
+ //get an aligned set of elements from the buffers:
+ while(indexes_to_do.size() != 0){
+ //pop an element off for this index
+ index = indexes_to_do.front();
+ if (not _buffs[index]->pop_with_timed_wait(buff_contents_tmp, time)) return false;
+
+ //if the sequence id matches:
+ // store the popped element into the output,
+ // remove this index from the list and continue
+ if (buff_contents_tmp.second == expected_seq_id){
+ elems[index] = buff_contents_tmp.first;
+ indexes_to_do.pop_front();
+ continue;
+ }
+
+ //if the sequence id is older:
+ // continue with the same index to try again
+ if (buff_contents_tmp.second < expected_seq_id){
+ continue;
+ }
+
+ //if the sequence id is newer:
+ // store the popped element into the output,
+ // add all other indexes back into the list
+ if (buff_contents_tmp.second > expected_seq_id){
+ elems[index] = buff_contents_tmp.first;
+ expected_seq_id = buff_contents_tmp.second;
+ indexes_to_do = _all_indexes;
+ indexes_to_do.remove(index);
+ continue;
+ }
+ }
+ return true;
+ }
+
+ private:
+ //a vector of bounded buffers for each index
+ typedef std::pair<elem_type, seq_type> buff_contents_type;
+ typedef bounded_buffer<buff_contents_type> bounded_buffer_type;
+ typedef boost::shared_ptr<bounded_buffer_type> bounded_buffer_sptr;
+ std::vector<bounded_buffer_sptr> _buffs;
+ std::list<size_t> _all_indexes;
+
+ //private constructor
+ alignment_buffer(size_t capacity, size_t width){
+ for (size_t i = 0; i < width; i++){
+ _buffs.push_back(bounded_buffer_type::make(capacity));
+ _all_indexes.push_back(i);
+ }
+ }
+ };
+
+}} //namespace
+
+#endif /* INCLUDED_UHD_TRANSPORT_ALIGNMENT_BUFFER_HPP */
diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp
new file mode 100644
index 000000000..baecd6382
--- /dev/null
+++ b/host/include/uhd/transport/bounded_buffer.hpp
@@ -0,0 +1,146 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP
+#define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP
+
+#include <uhd/config.hpp>
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/circular_buffer.hpp>
+#include <boost/thread/condition.hpp>
+
+namespace uhd{ namespace transport{
+
+ /*!
+ * Imlement a templated bounded buffer:
+ * Used for passing elements between threads in a producer-consumer model.
+ * The bounded buffer implemented waits and timed waits with condition variables.
+ * The pop operation blocks on the bounded_buffer to become non empty.
+ * The push operation blocks on the bounded_buffer to become non full.
+ */
+ template <typename elem_type> class bounded_buffer{
+ public:
+ typedef boost::shared_ptr<bounded_buffer<elem_type> > sptr;
+
+ /*!
+ * Make a new bounded buffer object.
+ * \param capacity the bounded_buffer capacity
+ */
+ static sptr make(size_t capacity){
+ return sptr(new bounded_buffer(capacity));
+ }
+
+ /*!
+ * Push a new element into the bounded buffer.
+ * If the buffer is full prior to the push,
+ * make room by poping the oldest element.
+ * \param elem the new element to push
+ * \return true if the element fit without popping for space
+ */
+ UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if(_buffer.full()){
+ _buffer.pop_back();
+ _buffer.push_front(elem);
+ lock.unlock();
+ _empty_cond.notify_one();
+ return false;
+ }
+ else{
+ _buffer.push_front(elem);
+ lock.unlock();
+ _empty_cond.notify_one();
+ return true;
+ }
+ }
+
+ /*!
+ * Push a new element into the bounded_buffer.
+ * Wait until the bounded_buffer becomes non-full.
+ * \param elem the new element to push
+ */
+ UHD_INLINE void push_with_wait(const elem_type &elem){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _full_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::not_full, this));
+ _buffer.push_front(elem);
+ lock.unlock();
+ _empty_cond.notify_one();
+ }
+
+ /*!
+ * Push a new element into the bounded_buffer.
+ * Wait until the bounded_buffer becomes non-full or timeout.
+ * \param elem the new element to push
+ * \param time the timeout time
+ * \return false when the operation times out
+ */
+ template<typename time_type> UHD_INLINE
+ bool push_with_timed_wait(const elem_type &elem, const time_type &time){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::not_full, this))) return false;
+ _buffer.push_front(elem);
+ lock.unlock();
+ _empty_cond.notify_one();
+ return true;
+ }
+
+ /*!
+ * Pop an element from the bounded_buffer.
+ * Wait until the bounded_buffer becomes non-empty.
+ * \param elem the element reference pop to
+ */
+ UHD_INLINE void pop_with_wait(elem_type &elem){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ _empty_cond.wait(lock, boost::bind(&bounded_buffer<elem_type>::not_empty, this));
+ elem = _buffer.back(); _buffer.pop_back();
+ lock.unlock();
+ _full_cond.notify_one();
+ }
+
+ /*!
+ * Pop an element from the bounded_buffer.
+ * Wait until the bounded_buffer becomes non-empty or timeout.
+ * \param elem the element reference pop to
+ * \param time the timeout time
+ * \return false when the operation times out
+ */
+ template<typename time_type> UHD_INLINE
+ bool pop_with_timed_wait(elem_type &elem, const time_type &time){
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer<elem_type>::not_empty, this))) return false;
+ elem = _buffer.back(); _buffer.pop_back();
+ lock.unlock();
+ _full_cond.notify_one();
+ return true;
+ }
+
+ private:
+ boost::mutex _mutex;
+ boost::condition _empty_cond, _full_cond;
+ boost::circular_buffer<elem_type> _buffer;
+
+ bool not_full(void) const{return not _buffer.full();}
+ bool not_empty(void) const{return not _buffer.empty();}
+
+ //private constructor
+ bounded_buffer(size_t capacity) : _buffer(capacity){}
+ };
+
+}} //namespace
+
+#endif /* INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP */
diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp
index 2efabaccf..d6eb89a91 100644
--- a/host/include/uhd/transport/zero_copy.hpp
+++ b/host/include/uhd/transport/zero_copy.hpp
@@ -124,9 +124,28 @@ namespace uhd{ namespace transport{
virtual managed_recv_buffer::sptr get_recv_buff(void) = 0;
/*!
+ * Get the maximum number of receive frames:
+ * The maximum number of valid managed recv buffers,
+ * or the maximum number of frames in the ring buffer,
+ * depending upon the underlying implementation.
+ * \return number of frames
+ */
+ virtual size_t get_num_recv_frames(void) const = 0;
+
+ /*!
* Get a new send buffer from this transport object.
*/
virtual managed_send_buffer::sptr get_send_buff(void) = 0;
+
+ /*!
+ * Get the maximum number of send frames:
+ * The maximum number of valid managed send buffers,
+ * or the maximum number of frames in the ring buffer,
+ * depending upon the underlying implementation.
+ * \return number of frames
+ */
+ virtual size_t get_num_send_frames(void) const = 0;
+
};
/*!
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 0c604811a..ffd1a4d65 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -20,6 +20,7 @@
#include <boost/cstdint.hpp>
#include <boost/asio.hpp>
#include <boost/format.hpp>
+#include <boost/thread.hpp>
#include <iostream>
using namespace uhd::transport;
@@ -28,8 +29,7 @@ using namespace uhd::transport;
* Constants
**********************************************************************/
static const size_t MIN_SOCK_BUFF_SIZE = size_t(100e3);
-static const size_t MAX_DGRAM_SIZE = 2048; //assume max size on send and recv
-static const double RECV_TIMEOUT = 0.1; // 100 ms
+static const size_t MAX_DGRAM_SIZE = 1500; //assume max size on send and recv
/***********************************************************************
* Zero Copy UDP implementation with ASIO:
@@ -46,12 +46,32 @@ class udp_zero_copy_impl:
public:
typedef boost::shared_ptr<udp_zero_copy_impl> sptr;
- //structors
- udp_zero_copy_impl(const std::string &addr, const std::string &port);
- ~udp_zero_copy_impl(void);
+ udp_zero_copy_impl(
+ const std::string &addr,
+ const std::string &port
+ ):
+ phony_zero_copy_recv_if(MAX_DGRAM_SIZE),
+ phony_zero_copy_send_if(MAX_DGRAM_SIZE)
+ {
+ //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
+
+ // resolve the address
+ boost::asio::ip::udp::resolver resolver(_io_service);
+ boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
+ boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
+
+ // create, open, and connect the socket
+ _socket = new boost::asio::ip::udp::socket(_io_service);
+ _socket->open(boost::asio::ip::udp::v4());
+ _socket->connect(receiver_endpoint);
+ }
+
+ ~udp_zero_copy_impl(void){
+ delete _socket;
+ }
//get size for internal socket buffer
- template <typename Opt> size_t get_buff_size(void){
+ template <typename Opt> size_t get_buff_size(void) const{
Opt option;
_socket->get_option(option);
return option.value();
@@ -64,12 +84,33 @@ public:
return get_buff_size<Opt>();
}
+
+ //The number of frames is approximately the buffer size divided by the max datagram size.
+ //In reality, this is a phony zero-copy interface and the number of frames is infinite.
+ //However, its sensible to advertise a frame count that is approximate to buffer size.
+ //This way, the transport caller will have an idea about how much buffering to create.
+
+ size_t get_num_recv_frames(void) const{
+ return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/MAX_DGRAM_SIZE;
+ }
+
+ size_t get_num_send_frames(void) const{
+ return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/MAX_DGRAM_SIZE;
+ }
+
private:
boost::asio::ip::udp::socket *_socket;
boost::asio::io_service _io_service;
size_t recv(const boost::asio::mutable_buffer &buff){
- return _socket->receive(boost::asio::buffer(buff));
+ boost::asio::deadline_timer timer(_socket->get_io_service());
+ timer.expires_from_now(boost::posix_time::milliseconds(100));
+ while (not (_socket->available() or timer.expires_from_now().is_negative())){
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1));
+ }
+
+ if (_socket->available()) return _socket->receive(boost::asio::buffer(buff));
+ return 0; //no bytes available, timeout...
}
size_t send(const boost::asio::const_buffer &buff){
@@ -77,41 +118,10 @@ private:
}
};
-udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port):
- phony_zero_copy_recv_if(MAX_DGRAM_SIZE),
- phony_zero_copy_send_if(MAX_DGRAM_SIZE)
-{
- //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
-
- // resolve the address
- boost::asio::ip::udp::resolver resolver(_io_service);
- boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
- boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
-
- // create, open, and connect the socket
- _socket = new boost::asio::ip::udp::socket(_io_service);
- _socket->open(boost::asio::ip::udp::v4());
- _socket->connect(receiver_endpoint);
-
- // set recv timeout
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = size_t(RECV_TIMEOUT*1e6);
- UHD_ASSERT_THROW(setsockopt(
- _socket->native(),
- SOL_SOCKET, SO_RCVTIMEO,
- (const char *)&tv, sizeof(timeval)
- ) == 0);
-}
-
-udp_zero_copy_impl::~udp_zero_copy_impl(void){
- delete _socket;
-}
-
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
-template<typename Opt> static inline void resize_buff_helper(
+template<typename Opt> static void resize_buff_helper(
udp_zero_copy_impl::sptr udp_trans,
size_t target_size,
const std::string &name
diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp
index e64e3383d..d6b863040 100644
--- a/host/lib/transport/vrt_packet_handler.hpp
+++ b/host/lib/transport/vrt_packet_handler.hpp
@@ -54,6 +54,8 @@ namespace vrt_packet_handler{
}
};
+ typedef boost::function<uhd::transport::managed_recv_buffer::sptr(void)> get_recv_buff_t;
+
typedef boost::function<void(uhd::transport::managed_recv_buffer::sptr)> recv_cb_t;
static UHD_INLINE void recv_cb_nop(uhd::transport::managed_recv_buffer::sptr){
@@ -112,15 +114,16 @@ namespace vrt_packet_handler{
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
- uhd::transport::zero_copy_if::sptr zc_iface,
+ const get_recv_buff_t &get_recv_buff,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32,
- const recv_cb_t& recv_cb
+ const recv_cb_t &recv_cb
){
//perform a receive if no rx data is waiting to be copied
if (boost::asio::buffer_size(state.copy_buff) == 0){
state.fragment_offset_in_samps = 0;
- state.managed_buff = zc_iface->get_recv_buff();
+ state.managed_buff = get_recv_buff();
+ if (state.managed_buff.get() == NULL) return 0;
recv_cb(state.managed_buff); //callback before vrt unpack
try{
_recv1_helper(
@@ -169,7 +172,7 @@ namespace vrt_packet_handler{
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
- uhd::transport::zero_copy_if::sptr zc_iface,
+ const get_recv_buff_t &get_recv_buff,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32 = 0,
const recv_cb_t& recv_cb = &recv_cb_nop
@@ -189,7 +192,7 @@ namespace vrt_packet_handler{
metadata,
io_type, otw_type,
tick_rate,
- zc_iface,
+ get_recv_buff,
vrt_header_offset_words32,
recv_cb
);
@@ -208,7 +211,7 @@ namespace vrt_packet_handler{
(accum_num_samps == 0)? metadata : tmp_md, //only the first metadata gets kept
io_type, otw_type,
tick_rate,
- zc_iface,
+ get_recv_buff,
vrt_header_offset_words32,
recv_cb
);
@@ -234,6 +237,8 @@ namespace vrt_packet_handler{
}
};
+ typedef boost::function<uhd::transport::managed_send_buffer::sptr(void)> get_send_buff_t;
+
typedef boost::function<void(uhd::transport::managed_send_buffer::sptr)> send_cb_t;
static UHD_INLINE void send_cb_nop(uhd::transport::managed_send_buffer::sptr){
@@ -252,12 +257,12 @@ namespace vrt_packet_handler{
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
- uhd::transport::zero_copy_if::sptr zc_iface,
+ const get_send_buff_t &get_send_buff,
size_t vrt_header_offset_words32,
const send_cb_t& send_cb
){
//get a new managed send buffer
- uhd::transport::managed_send_buffer::sptr send_buff = zc_iface->get_send_buff();
+ uhd::transport::managed_send_buffer::sptr send_buff = get_send_buff();
boost::uint32_t *tx_mem = send_buff->cast<boost::uint32_t *>() + vrt_header_offset_words32;
size_t num_header_words32, num_packet_words32;
@@ -298,7 +303,7 @@ namespace vrt_packet_handler{
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
- uhd::transport::zero_copy_if::sptr zc_iface,
+ const get_send_buff_t &get_send_buff,
size_t max_samples_per_packet,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32 = 0,
@@ -319,7 +324,7 @@ namespace vrt_packet_handler{
metadata,
io_type, otw_type,
tick_rate,
- zc_iface,
+ get_send_buff,
vrt_header_offset_words32,
send_cb
);
@@ -353,7 +358,7 @@ namespace vrt_packet_handler{
md,
io_type, otw_type,
tick_rate,
- zc_iface,
+ get_send_buff,
vrt_header_offset_words32,
send_cb
);
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index 79b18fb63..efd64d4ab 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -15,11 +15,15 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
+#include "../../transport/vrt_packet_handler.hpp"
#include "usrp2_impl.hpp"
#include "usrp2_regs.hpp"
#include <uhd/transport/convert_types.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
#include <boost/format.hpp>
#include <boost/asio.hpp> //htonl and ntohl
+#include <boost/bind.hpp>
+#include <boost/thread.hpp>
#include <iostream>
using namespace uhd;
@@ -28,6 +32,60 @@ using namespace uhd::transport;
namespace asio = boost::asio;
/***********************************************************************
+ * io impl details (internal to this file)
+ **********************************************************************/
+struct usrp2_impl::io_impl{
+
+ io_impl(zero_copy_if::sptr zc_if);
+ ~io_impl(void);
+
+ managed_recv_buffer::sptr get_recv_buff(void);
+
+ //state management for the vrt packet handler code
+ vrt_packet_handler::recv_state packet_handler_recv_state;
+ vrt_packet_handler::send_state packet_handler_send_state;
+
+ //methods and variables for the recv pirate
+ void recv_pirate_loop(zero_copy_if::sptr zc_if);
+ boost::thread *recv_pirate_thread; bool recv_pirate_running;
+ bounded_buffer<managed_recv_buffer::sptr>::sptr recv_pirate_booty;
+};
+
+usrp2_impl::io_impl::io_impl(zero_copy_if::sptr zc_if){
+ //create a large enough booty
+ size_t num_frames = zc_if->get_num_recv_frames();
+ std::cout << "Recv pirate num frames: " << num_frames << std::endl;
+ recv_pirate_booty = bounded_buffer<managed_recv_buffer::sptr>::make(num_frames);
+
+ //create a new pirate thread (yarr!!)
+ recv_pirate_thread = new boost::thread(
+ boost::bind(&usrp2_impl::io_impl::recv_pirate_loop, this, zc_if)
+ );
+}
+
+usrp2_impl::io_impl::~io_impl(void){
+ recv_pirate_running = false;
+ recv_pirate_thread->interrupt();
+ recv_pirate_thread->join();
+ delete recv_pirate_thread;
+}
+
+managed_recv_buffer::sptr usrp2_impl::io_impl::get_recv_buff(void){
+ managed_recv_buffer::sptr buff;
+ recv_pirate_booty->pop_with_timed_wait(buff, boost::posix_time::milliseconds(100));
+ //timeout means a null sptr...
+ return buff;
+}
+
+void usrp2_impl::io_impl::recv_pirate_loop(zero_copy_if::sptr zc_if){
+ recv_pirate_running = true;
+ while(recv_pirate_running){
+ managed_recv_buffer::sptr buff = zc_if->get_recv_buff();
+ if (buff->size()) recv_pirate_booty->push_with_pop_on_full(buff);
+ }
+}
+
+/***********************************************************************
* Helper Functions
**********************************************************************/
void usrp2_impl::io_init(void){
@@ -60,11 +118,22 @@ void usrp2_impl::io_init(void){
);
_iface->poke32(FR_RX_CTRL_VRT_STREAM_ID, 0);
_iface->poke32(FR_RX_CTRL_VRT_TRAILER, 0);
+
+ //create new io impl
+ _io_impl = new io_impl(_data_transport);
+}
+
+void usrp2_impl::io_done(void){
+ delete _io_impl;
}
/***********************************************************************
* Send Data
**********************************************************************/
+static inline managed_send_buffer::sptr get_send_buff(zero_copy_if::sptr zc_if){
+ return zc_if->get_send_buff();
+}
+
size_t usrp2_impl::send(
const asio::const_buffer &buff,
const tx_metadata_t &metadata,
@@ -72,11 +141,11 @@ size_t usrp2_impl::send(
send_mode_t send_mode
){
return vrt_packet_handler::send(
- _packet_handler_send_state, //last state of the send handler
+ _io_impl->packet_handler_send_state, //last state of the send handler
buff, metadata, send_mode, //buffer to empty and samples metadata
io_type, _tx_otw_type, //input and output types to convert
get_master_clock_freq(), //master clock tick rate
- _data_transport, //zero copy interface
+ boost::bind(get_send_buff, _data_transport),
get_max_send_samps_per_packet()
);
}
@@ -91,10 +160,10 @@ size_t usrp2_impl::recv(
recv_mode_t recv_mode
){
return vrt_packet_handler::recv(
- _packet_handler_recv_state, //last state of the recv handler
+ _io_impl->packet_handler_recv_state, //last state of the recv handler
buff, metadata, recv_mode, //buffer to fill and samples metadata
io_type, _rx_otw_type, //input and output types to convert
get_master_clock_freq(), //master clock tick rate
- _data_transport //zero copy interface
+ boost::bind(&usrp2_impl::io_impl::get_recv_buff, _io_impl)
);
}
diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp
index af3ec216a..7f79c483b 100644
--- a/host/lib/usrp/usrp2/usrp2_impl.cpp
+++ b/host/lib/usrp/usrp2/usrp2_impl.cpp
@@ -185,7 +185,8 @@ usrp2_impl::usrp2_impl(
}
usrp2_impl::~usrp2_impl(void){
- /* NOP */
+ //cleanup the send and recv io
+ io_done();
}
/***********************************************************************
diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp
index 7948a2069..4b6805217 100644
--- a/host/lib/usrp/usrp2/usrp2_impl.hpp
+++ b/host/lib/usrp/usrp2/usrp2_impl.hpp
@@ -33,7 +33,6 @@
#include <uhd/transport/vrt.hpp>
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/usrp/dboard_manager.hpp>
-#include "../../transport/vrt_packet_handler.hpp"
/*!
* Make a usrp2 dboard interface.
@@ -153,10 +152,10 @@ private:
uhd::transport::vrt::max_header_words32*sizeof(boost::uint32_t)
;
- vrt_packet_handler::recv_state _packet_handler_recv_state;
- vrt_packet_handler::send_state _packet_handler_send_state;
uhd::otw_type_t _rx_otw_type, _tx_otw_type;
+ struct io_impl; io_impl *_io_impl;
void io_init(void);
+ void io_done(void);
//udp transports for control and data
uhd::transport::udp_zero_copy::sptr _data_transport;
diff --git a/host/test/CMakeLists.txt b/host/test/CMakeLists.txt
index 61b0b503d..24778d13e 100644
--- a/host/test/CMakeLists.txt
+++ b/host/test/CMakeLists.txt
@@ -21,6 +21,7 @@
ADD_EXECUTABLE(main_test
main_test.cpp
addr_test.cpp
+ buffer_test.cpp
dict_test.cpp
error_test.cpp
gain_handler_test.cpp
diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp
new file mode 100644
index 000000000..aadb3f951
--- /dev/null
+++ b/host/test/buffer_test.cpp
@@ -0,0 +1,115 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <boost/test/unit_test.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/transport/alignment_buffer.hpp>
+#include <boost/assign/list_of.hpp>
+
+using namespace boost::assign;
+using namespace uhd::transport;
+
+static const boost::posix_time::milliseconds timeout(10);
+
+BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_timed_wait){
+ bounded_buffer<int>::sptr bb(bounded_buffer<int>::make(3));
+
+ //push elements, check for timeout
+ BOOST_CHECK(bb->push_with_timed_wait(0, timeout));
+ BOOST_CHECK(bb->push_with_timed_wait(1, timeout));
+ BOOST_CHECK(bb->push_with_timed_wait(2, timeout));
+ BOOST_CHECK(not bb->push_with_timed_wait(3, timeout));
+
+ int val;
+ //pop elements, check for timeout and check values
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 0);
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 1);
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 2);
+ BOOST_CHECK(not bb->pop_with_timed_wait(val, timeout));
+}
+
+BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_pop_on_full){
+ bounded_buffer<int>::sptr bb(bounded_buffer<int>::make(3));
+
+ //push elements, check for timeout
+ BOOST_CHECK(bb->push_with_pop_on_full(0));
+ BOOST_CHECK(bb->push_with_pop_on_full(1));
+ BOOST_CHECK(bb->push_with_pop_on_full(2));
+ BOOST_CHECK(not bb->push_with_pop_on_full(3));
+
+ int val;
+ //pop elements, check for timeout and check values
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 1);
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 2);
+ BOOST_CHECK(bb->pop_with_timed_wait(val, timeout));
+ BOOST_CHECK_EQUAL(val, 3);
+}
+
+BOOST_AUTO_TEST_CASE(test_alignment_buffer){
+ alignment_buffer<int, size_t>::sptr ab(alignment_buffer<int, size_t>::make(7, 3));
+ //load index 0 with all good seq numbers
+ BOOST_CHECK(ab->push_with_pop_on_full(0, 0, 0));
+ BOOST_CHECK(ab->push_with_pop_on_full(1, 1, 0));
+ BOOST_CHECK(ab->push_with_pop_on_full(2, 2, 0));
+ BOOST_CHECK(ab->push_with_pop_on_full(3, 3, 0));
+ BOOST_CHECK(ab->push_with_pop_on_full(4, 4, 0));
+
+ //load index 1 with some skipped seq numbers
+ BOOST_CHECK(ab->push_with_pop_on_full(10, 0, 1));
+ BOOST_CHECK(ab->push_with_pop_on_full(11, 1, 1));
+ BOOST_CHECK(ab->push_with_pop_on_full(14, 4, 1));
+ BOOST_CHECK(ab->push_with_pop_on_full(15, 5, 1));
+ BOOST_CHECK(ab->push_with_pop_on_full(16, 6, 1));
+
+ //load index 2 with all good seq numbers
+ BOOST_CHECK(ab->push_with_pop_on_full(20, 0, 2));
+ BOOST_CHECK(ab->push_with_pop_on_full(21, 1, 2));
+ BOOST_CHECK(ab->push_with_pop_on_full(22, 2, 2));
+ BOOST_CHECK(ab->push_with_pop_on_full(23, 3, 2));
+ BOOST_CHECK(ab->push_with_pop_on_full(24, 4, 2));
+
+ //readback aligned values
+ std::vector<int> aligned_elems(3);
+
+ static const std::vector<int> expected_elems0 = list_of(0)(10)(20);
+ BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
+ BOOST_CHECK_EQUAL_COLLECTIONS(
+ aligned_elems.begin(), aligned_elems.end(),
+ expected_elems0.begin(), expected_elems0.end()
+ );
+
+ static const std::vector<int> expected_elems1 = list_of(1)(11)(21);
+ BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
+ BOOST_CHECK_EQUAL_COLLECTIONS(
+ aligned_elems.begin(), aligned_elems.end(),
+ expected_elems1.begin(), expected_elems1.end()
+ );
+
+ //there was a skip now find 4
+
+ static const std::vector<int> expected_elems4 = list_of(4)(14)(24);
+ BOOST_CHECK(ab->pop_elems_with_timed_wait(aligned_elems, timeout));
+ BOOST_CHECK_EQUAL_COLLECTIONS(
+ aligned_elems.begin(), aligned_elems.end(),
+ expected_elems4.begin(), expected_elems4.end()
+ );
+}