From c6f17f1e2f908747ae1547fa43b2c22c3c20ba50 Mon Sep 17 00:00:00 2001
From: Josh Blum <josh@joshknows.com>
Date: Sun, 3 Oct 2010 19:34:41 -0700
Subject: uhd: changed buffer allocations to be in a single chunk, udp: pass
 frame sizes into the impl constructor

---
 host/lib/transport/libusb1_zero_copy.cpp  | 27 +++++-----
 host/lib/transport/udp_zero_copy_asio.cpp | 85 ++++++++++++++++---------------
 2 files changed, 59 insertions(+), 53 deletions(-)

(limited to 'host/lib')

diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index c819302b6..ab48e4fc4 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -100,13 +100,13 @@ private:
     lut_buff_type::sptr _completed_list;
 
     //! a list of all transfer structs we allocated
-    std::vector<libusb_transfer *>  _all_luts;
+    std::vector<libusb_transfer *> _all_luts;
 
-    //! a list of shared arrays for the transfer buffers
-    std::vector<boost::shared_array<boost::uint8_t> > _buffers;
+    //! a block of memory for the transfer buffers
+    boost::shared_array<char> _buffer;
 
     // Calls for processing asynchronous I/O
-    libusb_transfer *allocate_transfer(int buff_len);
+    libusb_transfer *allocate_transfer(void *mem, size_t len);
     void print_transfer_status(libusb_transfer *lut);
 };
 
@@ -154,9 +154,9 @@ usb_endpoint::usb_endpoint(
     _input(input)
 {
     _completed_list = lut_buff_type::make(num_transfers);
-
+    _buffer = boost::shared_array<char>(new char[num_transfers*transfer_size]);
     for (size_t i = 0; i < num_transfers; i++){
-        _all_luts.push_back(allocate_transfer(transfer_size));
+        _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size));
 
         //input luts are immediately submitted to be filled
         //output luts go into the completed list as free buffers
@@ -193,23 +193,23 @@ usb_endpoint::~usb_endpoint(void){
  * Allocate a libusb transfer
  * The allocated transfer - and buffer it contains - is repeatedly
  * submitted, reaped, and reused and should not be freed until shutdown.
- * \param buff_len size of the individual buffer held by each transfer
+ * \param mem a pointer to the buffer memory
+ * \param len size of the individual buffer
  * \return pointer to an allocated libusb_transfer
  */
-libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){
+libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){
     libusb_transfer *lut = libusb_alloc_transfer(0);
-
-    boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]);
-    _buffers.push_back(buff); //store a reference to this shared array
+    UHD_ASSERT_THROW(lut != NULL);
 
     unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0));
+    unsigned char *buff = reinterpret_cast<unsigned char *>(mem);
     libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);
 
     libusb_fill_bulk_transfer(lut,                // transfer
                               _handle->get(),     // dev_handle
                               endpoint,           // endpoint
-                              buff.get(),         // buffer
-                              buff_len,           // length
+                              buff,               // buffer
+                              len,                // length
                               lut_callback,       // callback
                               this,               // user_data
                               0);                 // timeout
@@ -232,6 +232,7 @@ void usb_endpoint::submit(libusb_transfer *lut){
  * \param lut pointer to an libusb_transfer
  */
 void usb_endpoint::print_transfer_status(libusb_transfer *lut){
+    std::cout << "here " << lut->status << std::endl;
     switch (lut->status) {
     case LIBUSB_TRANSFER_COMPLETED:
         if (lut->actual_length < lut->length) {
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 70e7514a1..a1eb516fc 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -18,6 +18,7 @@
 #include <uhd/transport/udp_zero_copy.hpp>
 #include <uhd/transport/udp_simple.hpp> //mtu
 #include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/utils/thread_priority.hpp>
 #include <uhd/utils/assert.hpp>
 #include <uhd/utils/warning.hpp>
 #include <boost/shared_array.hpp>
@@ -26,6 +27,7 @@
 #include <boost/thread.hpp>
 #include <iostream>
 
+using namespace uhd;
 using namespace uhd::transport;
 namespace asio = boost::asio;
 
@@ -34,11 +36,15 @@ namespace asio = boost::asio;
  **********************************************************************/
 //enough buffering for half a second of samples at full rate on usrp2
 static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
+
 //Large buffers cause more underflow at high rates.
 //Perhaps this is due to the kernel scheduling,
 //but may change with host-based flow control.
 static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3);
 
+//the number of async frames to allocate for each send and recv
+static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32;
+
 /***********************************************************************
  * Zero Copy UDP implementation with ASIO:
  *   This is the portable zero copy implementation for systems
@@ -50,51 +56,52 @@ class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_share
 public:
     typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
 
-    udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){
+    udp_zero_copy_asio_impl(
+        const std::string &addr, const std::string &port,
+        size_t recv_frame_size, size_t num_recv_frames,
+        size_t send_frame_size, size_t num_send_frames
+    ):
+        _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames),
+        _send_frame_size(send_frame_size), _num_send_frames(num_send_frames)
+    {
         //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
 
-        // resolve the address
+        //resolve the address
         asio::ip::udp::resolver resolver(_io_service);
         asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
         asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
 
-        // create, open, and connect the socket
+        //create, open, and connect the socket
         _socket = new asio::ip::udp::socket(_io_service);
         _socket->open(asio::ip::udp::v4());
         _socket->connect(receiver_endpoint);
     }
 
-    ~udp_zero_copy_asio_impl(void){
-        _io_service.stop();
-        _thread_group.join_all();
-        delete _socket;
-    }
-
-    /*!
-     * Init, the second contructor:
-     * Allocate memory and spwan service thread.
-     */
     void init(void){
         //allocate all recv frames and release them to begin xfers
-        _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames());
-        for (size_t i = 0; i < this->get_num_recv_frames(); i++){
-            boost::shared_array<char> buff(new char[udp_simple::mtu]);
-            _buffers.push_back(buff); //store a reference to this shared array
-            release(buff.get());
+        _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames);
+        _recv_buffer = boost::shared_array<char>(new char[_num_recv_frames*_recv_frame_size]);
+        for (size_t i = 0; i < _num_recv_frames; i++){
+            release(_recv_buffer.get() + i*_recv_frame_size);
         }
 
         //allocate all send frames and push them into the fifo
-        _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames());
-        for (size_t i = 0; i < this->get_num_send_frames(); i++){
-            boost::shared_array<char> buff(new char[udp_simple::mtu]);
-            _buffers.push_back(buff); //store a reference to this shared array
-            handle_send(buff.get());
+        _pending_send_buffs = pending_buffs_type::make(_num_send_frames);
+        _send_buffer = boost::shared_array<char>(new char[_num_send_frames*_send_frame_size]);
+        for (size_t i = 0; i < _num_send_frames; i++){
+            handle_send(_send_buffer.get() + i*_send_frame_size);
         }
 
         //spawn the service thread that will run the io service
         _thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this));
     }
 
+    ~udp_zero_copy_asio_impl(void){
+        _io_service.stop();
+        _thread_group.join_all();
+        delete _socket;
+    }
+
     //get size for internal socket buffer
     template <typename Opt> size_t get_buff_size(void) const{
         Opt option;
@@ -109,19 +116,6 @@ public:
         return get_buff_size<Opt>();
     }
 
-    //The number of frames is approximately the buffer size divided by the max datagram size.
-    //In reality, this is a phony zero-copy interface and the number of frames is infinite.
-    //However, its sensible to advertise a frame count that is approximate to buffer size.
-    //This way, the transport caller will have an idea about how much buffering to create.
-
-    size_t get_num_recv_frames(void) const{
-        return this->get_buff_size<asio::socket_base::receive_buffer_size>()/udp_simple::mtu;
-    }
-
-    size_t get_num_send_frames(void) const{
-        return this->get_buff_size<asio::socket_base::send_buffer_size>()/udp_simple::mtu;
-    }
-
     //! pop a filled recv buffer off of the fifo and bind with the release callback
     managed_recv_buffer::sptr get_recv_buff(double timeout){
         boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -138,6 +132,8 @@ public:
         return managed_recv_buffer::sptr();
     }
 
+    size_t get_num_recv_frames(void) const {return _num_recv_frames;}
+
     //! pop an empty send buffer off of the fifo and bind with the commit callback
     managed_send_buffer::sptr get_send_buff(double timeout){
         boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -154,8 +150,11 @@ public:
         return managed_send_buffer::sptr();
     }
 
+    size_t get_num_send_frames(void) const {return _num_send_frames;}
+
 private:
     void service(void){
+        set_thread_priority_safe();
         _io_service.run();
     }
 
@@ -172,7 +171,7 @@ private:
     //! release a recv buffer -> start an async recv on the buffer
     void release(void *mem){
         _socket->async_receive(
-            boost::asio::buffer(mem, udp_simple::mtu),
+            boost::asio::buffer(mem, _recv_frame_size),
             boost::bind(
                 &udp_zero_copy_asio_impl::handle_recv,
                 shared_from_this(), mem,
@@ -184,7 +183,7 @@ private:
     //! handle a send callback -> push the emptied memory into the fifo
     void handle_send(void *mem){
         boost::this_thread::disable_interruption di; //disable because the wait can throw
-        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu));
+        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size));
     }
 
     //! commit a send buffer -> start an async send on the buffer
@@ -204,9 +203,11 @@ private:
 
     //memory management -> buffers and fifos
     boost::thread_group _thread_group;
-    std::vector<boost::shared_array<char> > _buffers;
+    boost::shared_array<char> _send_buffer, _recv_buffer;
     typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type;
     pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs;
+    const size_t _recv_frame_size, _num_recv_frames;
+    const size_t _send_frame_size, _num_send_frames;
 };
 
 /***********************************************************************
@@ -253,7 +254,11 @@ udp_zero_copy::sptr udp_zero_copy::make(
     size_t recv_buff_size,
     size_t send_buff_size
 ){
-    udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port));
+    udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(
+        addr, port,
+        udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES, //recv
+        udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES  //send
+    ));
 
     //call the helper to resize send and recv buffers
     resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
-- 
cgit v1.2.3