aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp27
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp85
2 files changed, 59 insertions, 53 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index c819302b6..ab48e4fc4 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -100,13 +100,13 @@ private:
lut_buff_type::sptr _completed_list;
//! a list of all transfer structs we allocated
- std::vector<libusb_transfer *> _all_luts;
+ std::vector<libusb_transfer *> _all_luts;
- //! a list of shared arrays for the transfer buffers
- std::vector<boost::shared_array<boost::uint8_t> > _buffers;
+ //! a block of memory for the transfer buffers
+ boost::shared_array<char> _buffer;
// Calls for processing asynchronous I/O
- libusb_transfer *allocate_transfer(int buff_len);
+ libusb_transfer *allocate_transfer(void *mem, size_t len);
void print_transfer_status(libusb_transfer *lut);
};
@@ -154,9 +154,9 @@ usb_endpoint::usb_endpoint(
_input(input)
{
_completed_list = lut_buff_type::make(num_transfers);
-
+ _buffer = boost::shared_array<char>(new char[num_transfers*transfer_size]);
for (size_t i = 0; i < num_transfers; i++){
- _all_luts.push_back(allocate_transfer(transfer_size));
+ _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size));
//input luts are immediately submitted to be filled
//output luts go into the completed list as free buffers
@@ -193,23 +193,23 @@ usb_endpoint::~usb_endpoint(void){
* Allocate a libusb transfer
* The allocated transfer - and buffer it contains - is repeatedly
* submitted, reaped, and reused and should not be freed until shutdown.
- * \param buff_len size of the individual buffer held by each transfer
+ * \param mem a pointer to the buffer memory
+ * \param len size of the individual buffer
* \return pointer to an allocated libusb_transfer
*/
-libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){
+libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){
libusb_transfer *lut = libusb_alloc_transfer(0);
-
- boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]);
- _buffers.push_back(buff); //store a reference to this shared array
+ UHD_ASSERT_THROW(lut != NULL);
unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0));
+ unsigned char *buff = reinterpret_cast<unsigned char *>(mem);
libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);
libusb_fill_bulk_transfer(lut, // transfer
_handle->get(), // dev_handle
endpoint, // endpoint
- buff.get(), // buffer
- buff_len, // length
+ buff, // buffer
+ len, // length
lut_callback, // callback
this, // user_data
0); // timeout
@@ -232,6 +232,7 @@ void usb_endpoint::submit(libusb_transfer *lut){
* \param lut pointer to an libusb_transfer
*/
void usb_endpoint::print_transfer_status(libusb_transfer *lut){
+ std::cout << "here " << lut->status << std::endl;
switch (lut->status) {
case LIBUSB_TRANSFER_COMPLETED:
if (lut->actual_length < lut->length) {
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 70e7514a1..a1eb516fc 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -18,6 +18,7 @@
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/transport/udp_simple.hpp> //mtu
#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/assert.hpp>
#include <uhd/utils/warning.hpp>
#include <boost/shared_array.hpp>
@@ -26,6 +27,7 @@
#include <boost/thread.hpp>
#include <iostream>
+using namespace uhd;
using namespace uhd::transport;
namespace asio = boost::asio;
@@ -34,11 +36,15 @@ namespace asio = boost::asio;
**********************************************************************/
//enough buffering for half a second of samples at full rate on usrp2
static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
+
//Large buffers cause more underflow at high rates.
//Perhaps this is due to the kernel scheduling,
//but may change with host-based flow control.
static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3);
+//the number of async frames to allocate for each send and recv
+static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32;
+
/***********************************************************************
* Zero Copy UDP implementation with ASIO:
* This is the portable zero copy implementation for systems
@@ -50,51 +56,52 @@ class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_share
public:
typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
- udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){
+ udp_zero_copy_asio_impl(
+ const std::string &addr, const std::string &port,
+ size_t recv_frame_size, size_t num_recv_frames,
+ size_t send_frame_size, size_t num_send_frames
+ ):
+ _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames),
+ _send_frame_size(send_frame_size), _num_send_frames(num_send_frames)
+ {
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
- // resolve the address
+ //resolve the address
asio::ip::udp::resolver resolver(_io_service);
asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
- // create, open, and connect the socket
+ //create, open, and connect the socket
_socket = new asio::ip::udp::socket(_io_service);
_socket->open(asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
}
- ~udp_zero_copy_asio_impl(void){
- _io_service.stop();
- _thread_group.join_all();
- delete _socket;
- }
-
- /*!
- * Init, the second contructor:
- * Allocate memory and spwan service thread.
- */
void init(void){
//allocate all recv frames and release them to begin xfers
- _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames());
- for (size_t i = 0; i < this->get_num_recv_frames(); i++){
- boost::shared_array<char> buff(new char[udp_simple::mtu]);
- _buffers.push_back(buff); //store a reference to this shared array
- release(buff.get());
+ _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames);
+ _recv_buffer = boost::shared_array<char>(new char[_num_recv_frames*_recv_frame_size]);
+ for (size_t i = 0; i < _num_recv_frames; i++){
+ release(_recv_buffer.get() + i*_recv_frame_size);
}
//allocate all send frames and push them into the fifo
- _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames());
- for (size_t i = 0; i < this->get_num_send_frames(); i++){
- boost::shared_array<char> buff(new char[udp_simple::mtu]);
- _buffers.push_back(buff); //store a reference to this shared array
- handle_send(buff.get());
+ _pending_send_buffs = pending_buffs_type::make(_num_send_frames);
+ _send_buffer = boost::shared_array<char>(new char[_num_send_frames*_send_frame_size]);
+ for (size_t i = 0; i < _num_send_frames; i++){
+ handle_send(_send_buffer.get() + i*_send_frame_size);
}
//spawn the service thread that will run the io service
_thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this));
}
+ ~udp_zero_copy_asio_impl(void){
+ _io_service.stop();
+ _thread_group.join_all();
+ delete _socket;
+ }
+
//get size for internal socket buffer
template <typename Opt> size_t get_buff_size(void) const{
Opt option;
@@ -109,19 +116,6 @@ public:
return get_buff_size<Opt>();
}
- //The number of frames is approximately the buffer size divided by the max datagram size.
- //In reality, this is a phony zero-copy interface and the number of frames is infinite.
- //However, its sensible to advertise a frame count that is approximate to buffer size.
- //This way, the transport caller will have an idea about how much buffering to create.
-
- size_t get_num_recv_frames(void) const{
- return this->get_buff_size<asio::socket_base::receive_buffer_size>()/udp_simple::mtu;
- }
-
- size_t get_num_send_frames(void) const{
- return this->get_buff_size<asio::socket_base::send_buffer_size>()/udp_simple::mtu;
- }
-
//! pop a filled recv buffer off of the fifo and bind with the release callback
managed_recv_buffer::sptr get_recv_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -138,6 +132,8 @@ public:
return managed_recv_buffer::sptr();
}
+ size_t get_num_recv_frames(void) const {return _num_recv_frames;}
+
//! pop an empty send buffer off of the fifo and bind with the commit callback
managed_send_buffer::sptr get_send_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -154,8 +150,11 @@ public:
return managed_send_buffer::sptr();
}
+ size_t get_num_send_frames(void) const {return _num_send_frames;}
+
private:
void service(void){
+ set_thread_priority_safe();
_io_service.run();
}
@@ -172,7 +171,7 @@ private:
//! release a recv buffer -> start an async recv on the buffer
void release(void *mem){
_socket->async_receive(
- boost::asio::buffer(mem, udp_simple::mtu),
+ boost::asio::buffer(mem, _recv_frame_size),
boost::bind(
&udp_zero_copy_asio_impl::handle_recv,
shared_from_this(), mem,
@@ -184,7 +183,7 @@ private:
//! handle a send callback -> push the emptied memory into the fifo
void handle_send(void *mem){
boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu));
+ _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size));
}
//! commit a send buffer -> start an async send on the buffer
@@ -204,9 +203,11 @@ private:
//memory management -> buffers and fifos
boost::thread_group _thread_group;
- std::vector<boost::shared_array<char> > _buffers;
+ boost::shared_array<char> _send_buffer, _recv_buffer;
typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type;
pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs;
+ const size_t _recv_frame_size, _num_recv_frames;
+ const size_t _send_frame_size, _num_send_frames;
};
/***********************************************************************
@@ -253,7 +254,11 @@ udp_zero_copy::sptr udp_zero_copy::make(
size_t recv_buff_size,
size_t send_buff_size
){
- udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port));
+ udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(
+ addr, port,
+ udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES, //recv
+ udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES //send
+ ));
//call the helper to resize send and recv buffers
resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");