diff options
| -rw-r--r-- | host/include/uhd/transport/zero_copy.hpp | 12 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 75 | 
2 files changed, 43 insertions, 44 deletions
| diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 4fc1df9de..fdc5b141c 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -45,7 +45,7 @@ public:       * Get the size of the underlying buffer.       * \return the number of bytes       */ -    size_t size(void){ +    size_t size(void) const{          return boost::asio::buffer_size(this->get());      } @@ -53,7 +53,7 @@ public:       * Get a pointer to the underlying buffer.       * \return a pointer into memory       */ -    template <class T> T cast(void){ +    template <class T> T cast(void) const{          return boost::asio::buffer_cast<T>(this->get());      } @@ -63,7 +63,7 @@ private:       * The buffer has a reference to memory and a size.       * \return a boost asio const buffer       */ -    virtual const boost::asio::const_buffer &get(void) = 0; +    virtual const boost::asio::const_buffer &get(void) const = 0;  };  /*! @@ -87,7 +87,7 @@ public:       * Get the size of the underlying buffer.       * \return the number of bytes       */ -    size_t size(void){ +    size_t size(void) const{          return boost::asio::buffer_size(this->get());      } @@ -95,7 +95,7 @@ public:       * Get a pointer to the underlying buffer.       * \return a pointer into memory       */ -    template <class T> T cast(void){ +    template <class T> T cast(void) const{          return boost::asio::buffer_cast<T>(this->get());      } @@ -105,7 +105,7 @@ private:       * The buffer has a reference to memory and a size.       * \return a boost asio mutable buffer       */ -    virtual const boost::asio::mutable_buffer &get(void) = 0; +    virtual const boost::asio::mutable_buffer &get(void) const = 0;  };  /*! diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index ee44803f4..c26b39867 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -16,9 +16,9 @@  //  #include <uhd/transport/udp_zero_copy.hpp> +#include <uhd/utils/assert.hpp>  #include <boost/cstdint.hpp>  #include <boost/asio.hpp> -#include <boost/thread.hpp>  #include <boost/format.hpp>  #include <iostream> @@ -26,65 +26,56 @@ using namespace uhd::transport;  /***********************************************************************   * Managed receive buffer implementation for udp zero-copy asio: - *   Frees the memory held by the const buffer on done.   **********************************************************************/  class managed_recv_buffer_impl : public managed_recv_buffer{  public:      managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){ -        _done = false; +        /* NOP */      }      ~managed_recv_buffer_impl(void){ -        if (not _done) this->done(); +        /* NOP */      }      void done(void){ -        _done = true; -        delete [] boost::asio::buffer_cast<const boost::uint32_t *>(_buff); +        /* NOP */      }  private: -    const boost::asio::const_buffer &get(void){ +    const boost::asio::const_buffer &get(void) const{          return _buff;      }      const boost::asio::const_buffer _buff; -    bool _done;  };  /***********************************************************************   * Managed send buffer implementation for udp zero-copy asio: - *   Sends and frees the memory held by the mutable buffer on done.   **********************************************************************/  class managed_send_buffer_impl : public managed_send_buffer{  public:      managed_send_buffer_impl(          const boost::asio::mutable_buffer &buff,          boost::asio::ip::udp::socket *socket -    ) : _buff(buff){ -        _done = false; -        _socket = socket; +    ) : _buff(buff), _socket(socket){ +        /* NOP */      }      ~managed_send_buffer_impl(void){ -        if (not _done) this->done(0); +        /* NOP */      }      void done(size_t num_bytes){ -        _done = true; -        boost::uint32_t *mem = boost::asio::buffer_cast<boost::uint32_t *>(_buff); -        _socket->send(boost::asio::buffer(mem, num_bytes)); -        delete [] mem; +        _socket->send(boost::asio::buffer(_buff, num_bytes));      }  private: -    const boost::asio::mutable_buffer &get(void){ +    const boost::asio::mutable_buffer &get(void) const{          return _buff;      }      const boost::asio::mutable_buffer _buff;      boost::asio::ip::udp::socket      *_socket; -    bool _done;  };  /*********************************************************************** @@ -94,6 +85,8 @@ private:   *   However, it is not a true zero copy implementation as each   *   send and recv requires a copy operation to/from userspace.   **********************************************************************/ +static const size_t max_buff_size = 2000; //assume max size on send and recv +  class udp_zero_copy_impl : public udp_zero_copy{  public:      //structors @@ -121,6 +114,11 @@ public:  private:      boost::asio::ip::udp::socket   *_socket;      boost::asio::io_service        _io_service; + +    //send and recv buffer memory (allocated once) +    boost::uint8_t _send_mem[max_buff_size], _recv_mem[max_buff_size]; + +    managed_send_buffer::sptr _send_buff;  };  udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){ @@ -131,10 +129,25 @@ udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::strin      boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);      boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); -    // Create, open, and connect the socket +    // create, open, and connect the socket      _socket = new boost::asio::ip::udp::socket(_io_service);      _socket->open(boost::asio::ip::udp::v4());      _socket->connect(receiver_endpoint); + +    // create the managed send buff (just once) +    _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( +        boost::asio::buffer(_send_mem, max_buff_size), _socket +    )); + +    // set recv timeout +    timeval tv; +    tv.tv_sec = 0; +    tv.tv_usec = 100*1000; //100 ms +    UHD_ASSERT_THROW(setsockopt( +        _socket->native(), +        SOL_SOCKET, SO_RCVTIMEO, +        (timeval *)&tv, sizeof(timeval) +    ) == 0);  }  udp_zero_copy_impl::~udp_zero_copy_impl(void){ @@ -142,31 +155,17 @@ udp_zero_copy_impl::~udp_zero_copy_impl(void){  }  managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){ -    //implement timeout through polling and sleeping -    size_t available = 0; -    boost::asio::deadline_timer timer(_socket->get_io_service()); -    timer.expires_from_now(boost::posix_time::milliseconds(100)); -    while (not ((available = _socket->available()) or timer.expires_from_now().is_negative())){ -        boost::this_thread::sleep(boost::posix_time::milliseconds(1)); -    } - -    //receive only if data is available -    boost::uint32_t *buff_mem = new boost::uint32_t[available/sizeof(boost::uint32_t)]; -    if (available){ -        available = _socket->receive(boost::asio::buffer(buff_mem, available)); -    } +    //call recv() with timeout option +    size_t num_bytes = _socket->receive(boost::asio::buffer(_recv_mem, max_buff_size));      //create a new managed buffer to house the data      return managed_recv_buffer::sptr( -        new managed_recv_buffer_impl(boost::asio::buffer(buff_mem, available)) +        new managed_recv_buffer_impl(boost::asio::buffer(_recv_mem, num_bytes))      );  }  managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){ -    boost::uint32_t *buff_mem = new boost::uint32_t[2000/sizeof(boost::uint32_t)]; -    return managed_send_buffer::sptr( -        new managed_send_buffer_impl(boost::asio::buffer(buff_mem, 2000), _socket) -    ); +    return _send_buff;  }  /*********************************************************************** | 
