summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/include/uhd/transport/zero_copy.hpp12
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp75
2 files changed, 43 insertions, 44 deletions
diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp
index 4fc1df9de..fdc5b141c 100644
--- a/host/include/uhd/transport/zero_copy.hpp
+++ b/host/include/uhd/transport/zero_copy.hpp
@@ -45,7 +45,7 @@ public:
* Get the size of the underlying buffer.
* \return the number of bytes
*/
- size_t size(void){
+ size_t size(void) const{
return boost::asio::buffer_size(this->get());
}
@@ -53,7 +53,7 @@ public:
* Get a pointer to the underlying buffer.
* \return a pointer into memory
*/
- template <class T> T cast(void){
+ template <class T> T cast(void) const{
return boost::asio::buffer_cast<T>(this->get());
}
@@ -63,7 +63,7 @@ private:
* The buffer has a reference to memory and a size.
* \return a boost asio const buffer
*/
- virtual const boost::asio::const_buffer &get(void) = 0;
+ virtual const boost::asio::const_buffer &get(void) const = 0;
};
/*!
@@ -87,7 +87,7 @@ public:
* Get the size of the underlying buffer.
* \return the number of bytes
*/
- size_t size(void){
+ size_t size(void) const{
return boost::asio::buffer_size(this->get());
}
@@ -95,7 +95,7 @@ public:
* Get a pointer to the underlying buffer.
* \return a pointer into memory
*/
- template <class T> T cast(void){
+ template <class T> T cast(void) const{
return boost::asio::buffer_cast<T>(this->get());
}
@@ -105,7 +105,7 @@ private:
* The buffer has a reference to memory and a size.
* \return a boost asio mutable buffer
*/
- virtual const boost::asio::mutable_buffer &get(void) = 0;
+ virtual const boost::asio::mutable_buffer &get(void) const = 0;
};
/*!
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index ee44803f4..c26b39867 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -16,9 +16,9 @@
//
#include <uhd/transport/udp_zero_copy.hpp>
+#include <uhd/utils/assert.hpp>
#include <boost/cstdint.hpp>
#include <boost/asio.hpp>
-#include <boost/thread.hpp>
#include <boost/format.hpp>
#include <iostream>
@@ -26,65 +26,56 @@ using namespace uhd::transport;
/***********************************************************************
* Managed receive buffer implementation for udp zero-copy asio:
- * Frees the memory held by the const buffer on done.
**********************************************************************/
class managed_recv_buffer_impl : public managed_recv_buffer{
public:
managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){
- _done = false;
+ /* NOP */
}
~managed_recv_buffer_impl(void){
- if (not _done) this->done();
+ /* NOP */
}
void done(void){
- _done = true;
- delete [] boost::asio::buffer_cast<const boost::uint32_t *>(_buff);
+ /* NOP */
}
private:
- const boost::asio::const_buffer &get(void){
+ const boost::asio::const_buffer &get(void) const{
return _buff;
}
const boost::asio::const_buffer _buff;
- bool _done;
};
/***********************************************************************
* Managed send buffer implementation for udp zero-copy asio:
- * Sends and frees the memory held by the mutable buffer on done.
**********************************************************************/
class managed_send_buffer_impl : public managed_send_buffer{
public:
managed_send_buffer_impl(
const boost::asio::mutable_buffer &buff,
boost::asio::ip::udp::socket *socket
- ) : _buff(buff){
- _done = false;
- _socket = socket;
+ ) : _buff(buff), _socket(socket){
+ /* NOP */
}
~managed_send_buffer_impl(void){
- if (not _done) this->done(0);
+ /* NOP */
}
void done(size_t num_bytes){
- _done = true;
- boost::uint32_t *mem = boost::asio::buffer_cast<boost::uint32_t *>(_buff);
- _socket->send(boost::asio::buffer(mem, num_bytes));
- delete [] mem;
+ _socket->send(boost::asio::buffer(_buff, num_bytes));
}
private:
- const boost::asio::mutable_buffer &get(void){
+ const boost::asio::mutable_buffer &get(void) const{
return _buff;
}
const boost::asio::mutable_buffer _buff;
boost::asio::ip::udp::socket *_socket;
- bool _done;
};
/***********************************************************************
@@ -94,6 +85,8 @@ private:
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
+static const size_t max_buff_size = 2000; //assume max size on send and recv
+
class udp_zero_copy_impl : public udp_zero_copy{
public:
//structors
@@ -121,6 +114,11 @@ public:
private:
boost::asio::ip::udp::socket *_socket;
boost::asio::io_service _io_service;
+
+ //send and recv buffer memory (allocated once)
+ boost::uint8_t _send_mem[max_buff_size], _recv_mem[max_buff_size];
+
+ managed_send_buffer::sptr _send_buff;
};
udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){
@@ -131,10 +129,25 @@ udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::strin
boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
- // Create, open, and connect the socket
+ // create, open, and connect the socket
_socket = new boost::asio::ip::udp::socket(_io_service);
_socket->open(boost::asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
+
+ // create the managed send buff (just once)
+ _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl(
+ boost::asio::buffer(_send_mem, max_buff_size), _socket
+ ));
+
+ // set recv timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 100*1000; //100 ms
+ UHD_ASSERT_THROW(setsockopt(
+ _socket->native(),
+ SOL_SOCKET, SO_RCVTIMEO,
+ (timeval *)&tv, sizeof(timeval)
+ ) == 0);
}
udp_zero_copy_impl::~udp_zero_copy_impl(void){
@@ -142,31 +155,17 @@ udp_zero_copy_impl::~udp_zero_copy_impl(void){
}
managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){
- //implement timeout through polling and sleeping
- size_t available = 0;
- boost::asio::deadline_timer timer(_socket->get_io_service());
- timer.expires_from_now(boost::posix_time::milliseconds(100));
- while (not ((available = _socket->available()) or timer.expires_from_now().is_negative())){
- boost::this_thread::sleep(boost::posix_time::milliseconds(1));
- }
-
- //receive only if data is available
- boost::uint32_t *buff_mem = new boost::uint32_t[available/sizeof(boost::uint32_t)];
- if (available){
- available = _socket->receive(boost::asio::buffer(buff_mem, available));
- }
+ //call recv() with timeout option
+ size_t num_bytes = _socket->receive(boost::asio::buffer(_recv_mem, max_buff_size));
//create a new managed buffer to house the data
return managed_recv_buffer::sptr(
- new managed_recv_buffer_impl(boost::asio::buffer(buff_mem, available))
+ new managed_recv_buffer_impl(boost::asio::buffer(_recv_mem, num_bytes))
);
}
managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){
- boost::uint32_t *buff_mem = new boost::uint32_t[2000/sizeof(boost::uint32_t)];
- return managed_send_buffer::sptr(
- new managed_send_buffer_impl(boost::asio::buffer(buff_mem, 2000), _socket)
- );
+ return _send_buff;
}
/***********************************************************************