diff options
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 75 |
1 files changed, 37 insertions, 38 deletions
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index ee44803f4..c26b39867 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -16,9 +16,9 @@ // #include <uhd/transport/udp_zero_copy.hpp> +#include <uhd/utils/assert.hpp> #include <boost/cstdint.hpp> #include <boost/asio.hpp> -#include <boost/thread.hpp> #include <boost/format.hpp> #include <iostream> @@ -26,65 +26,56 @@ using namespace uhd::transport; /*********************************************************************** * Managed receive buffer implementation for udp zero-copy asio: - * Frees the memory held by the const buffer on done. **********************************************************************/ class managed_recv_buffer_impl : public managed_recv_buffer{ public: managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){ - _done = false; + /* NOP */ } ~managed_recv_buffer_impl(void){ - if (not _done) this->done(); + /* NOP */ } void done(void){ - _done = true; - delete [] boost::asio::buffer_cast<const boost::uint32_t *>(_buff); + /* NOP */ } private: - const boost::asio::const_buffer &get(void){ + const boost::asio::const_buffer &get(void) const{ return _buff; } const boost::asio::const_buffer _buff; - bool _done; }; /*********************************************************************** * Managed send buffer implementation for udp zero-copy asio: - * Sends and frees the memory held by the mutable buffer on done. **********************************************************************/ class managed_send_buffer_impl : public managed_send_buffer{ public: managed_send_buffer_impl( const boost::asio::mutable_buffer &buff, boost::asio::ip::udp::socket *socket - ) : _buff(buff){ - _done = false; - _socket = socket; + ) : _buff(buff), _socket(socket){ + /* NOP */ } ~managed_send_buffer_impl(void){ - if (not _done) this->done(0); + /* NOP */ } void done(size_t num_bytes){ - _done = true; - boost::uint32_t *mem = boost::asio::buffer_cast<boost::uint32_t *>(_buff); - _socket->send(boost::asio::buffer(mem, num_bytes)); - delete [] mem; + _socket->send(boost::asio::buffer(_buff, num_bytes)); } private: - const boost::asio::mutable_buffer &get(void){ + const boost::asio::mutable_buffer &get(void) const{ return _buff; } const boost::asio::mutable_buffer _buff; boost::asio::ip::udp::socket *_socket; - bool _done; }; /*********************************************************************** @@ -94,6 +85,8 @@ private: * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ +static const size_t max_buff_size = 2000; //assume max size on send and recv + class udp_zero_copy_impl : public udp_zero_copy{ public: //structors @@ -121,6 +114,11 @@ public: private: boost::asio::ip::udp::socket *_socket; boost::asio::io_service _io_service; + + //send and recv buffer memory (allocated once) + boost::uint8_t _send_mem[max_buff_size], _recv_mem[max_buff_size]; + + managed_send_buffer::sptr _send_buff; }; udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){ @@ -131,10 +129,25 @@ udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::strin boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - // Create, open, and connect the socket + // create, open, and connect the socket _socket = new boost::asio::ip::udp::socket(_io_service); _socket->open(boost::asio::ip::udp::v4()); _socket->connect(receiver_endpoint); + + // create the managed send buff (just once) + _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( + boost::asio::buffer(_send_mem, max_buff_size), _socket + )); + + // set recv timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100*1000; //100 ms + UHD_ASSERT_THROW(setsockopt( + _socket->native(), + SOL_SOCKET, SO_RCVTIMEO, + (timeval *)&tv, sizeof(timeval) + ) == 0); } udp_zero_copy_impl::~udp_zero_copy_impl(void){ @@ -142,31 +155,17 @@ udp_zero_copy_impl::~udp_zero_copy_impl(void){ } managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){ - //implement timeout through polling and sleeping - size_t available = 0; - boost::asio::deadline_timer timer(_socket->get_io_service()); - timer.expires_from_now(boost::posix_time::milliseconds(100)); - while (not ((available = _socket->available()) or timer.expires_from_now().is_negative())){ - boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - } - - //receive only if data is available - boost::uint32_t *buff_mem = new boost::uint32_t[available/sizeof(boost::uint32_t)]; - if (available){ - available = _socket->receive(boost::asio::buffer(buff_mem, available)); - } + //call recv() with timeout option + size_t num_bytes = _socket->receive(boost::asio::buffer(_recv_mem, max_buff_size)); //create a new managed buffer to house the data return managed_recv_buffer::sptr( - new managed_recv_buffer_impl(boost::asio::buffer(buff_mem, available)) + new managed_recv_buffer_impl(boost::asio::buffer(_recv_mem, num_bytes)) ); } managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){ - boost::uint32_t *buff_mem = new boost::uint32_t[2000/sizeof(boost::uint32_t)]; - return managed_send_buffer::sptr( - new managed_send_buffer_impl(boost::asio::buffer(buff_mem, 2000), _socket) - ); + return _send_buff; } /*********************************************************************** |