// // Copyright 2010 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program. If not, see . // #include #include #include #include #include #include using namespace uhd::transport; /*********************************************************************** * Constants **********************************************************************/ static const size_t MIN_SOCK_BUFF_SIZE = size_t(100e3); static const size_t MAX_DGRAM_SIZE = 2048; //assume max size on send and recv static const double RECV_TIMEOUT = 0.1; // 100 ms /*********************************************************************** * Managed receive buffer implementation for udp zero-copy asio: **********************************************************************/ class managed_recv_buffer_impl : public managed_recv_buffer{ public: managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){ /* NOP */ } ~managed_recv_buffer_impl(void){ delete [] this->cast(); } private: const boost::asio::const_buffer &get(void) const{ return _buff; } const boost::asio::const_buffer _buff; }; /*********************************************************************** * Managed send buffer implementation for udp zero-copy asio: **********************************************************************/ class managed_send_buffer_impl : public managed_send_buffer{ public: managed_send_buffer_impl( const boost::asio::mutable_buffer &buff, boost::asio::ip::udp::socket *socket ) : _buff(buff), _socket(socket){ /* NOP */ } ~managed_send_buffer_impl(void){ /* NOP */ } void commit(size_t num_bytes){ _socket->send(boost::asio::buffer(_buff, num_bytes)); } private: const boost::asio::mutable_buffer &get(void) const{ return _buff; } const boost::asio::mutable_buffer _buff; boost::asio::ip::udp::socket *_socket; }; /*********************************************************************** * Zero Copy UDP implementation with ASIO: * This is the portable zero copy implementation for systems * where a faster, platform specific solution is not available. * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ class udp_zero_copy_impl : public udp_zero_copy{ public: typedef boost::shared_ptr sptr; //structors udp_zero_copy_impl(const std::string &addr, const std::string &port); ~udp_zero_copy_impl(void); //send/recv managed_recv_buffer::sptr get_recv_buff(void); managed_send_buffer::sptr get_send_buff(void); //manage buffer template size_t get_buff_size(void){ Opt option; _socket->get_option(option); return option.value(); } template size_t resize_buff(size_t num_bytes){ Opt option(num_bytes); _socket->set_option(option); return get_buff_size(); } private: boost::asio::ip::udp::socket *_socket; boost::asio::io_service _io_service; //send and recv buffer memory (allocated once) boost::uint8_t _send_mem[MIN_SOCK_BUFF_SIZE]; managed_send_buffer::sptr _send_buff; }; udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){ //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; // resolve the address boost::asio::ip::udp::resolver resolver(_io_service); boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); // create, open, and connect the socket _socket = new boost::asio::ip::udp::socket(_io_service); _socket->open(boost::asio::ip::udp::v4()); _socket->connect(receiver_endpoint); // create the managed send buff (just once) _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( boost::asio::buffer(_send_mem, MIN_SOCK_BUFF_SIZE), _socket )); // set recv timeout timeval tv; tv.tv_sec = 0; tv.tv_usec = size_t(RECV_TIMEOUT*1e6); UHD_ASSERT_THROW(setsockopt( _socket->native(), SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof(timeval) ) == 0); } udp_zero_copy_impl::~udp_zero_copy_impl(void){ delete _socket; } managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){ //allocate memory boost::uint8_t *recv_mem = new boost::uint8_t[MAX_DGRAM_SIZE]; //call recv() with timeout option size_t num_bytes = _socket->receive(boost::asio::buffer(recv_mem, MAX_DGRAM_SIZE)); //create a new managed buffer to house the data return managed_recv_buffer::sptr( new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes)) ); } managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){ return _send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these } /*********************************************************************** * UDP zero copy make function **********************************************************************/ template static inline void resize_buff_helper( udp_zero_copy_impl::sptr udp_trans, size_t target_size, const std::string &name ){ //resize the buffer if size was provided if (target_size > 0){ size_t actual_size = udp_trans->resize_buff(target_size); if (target_size != actual_size) std::cout << boost::format( "Target %s buffer size: %d\n" "Actual %s byffer size: %d" ) % name % target_size % name % actual_size << std::endl; } //otherwise, ensure that the buffer is at least the minimum size else if (udp_trans->get_buff_size() < MIN_SOCK_BUFF_SIZE){ resize_buff_helper(udp_trans, MIN_SOCK_BUFF_SIZE, name); } } udp_zero_copy::sptr udp_zero_copy::make( const std::string &addr, const std::string &port, size_t recv_buff_size, size_t send_buff_size ){ udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port)); //call the helper to resize send and recv buffers resize_buff_helper(udp_trans, recv_buff_size, "recv"); resize_buff_helper (udp_trans, send_buff_size, "send"); return udp_trans; }