diff options
author | Josh Blum <josh@joshknows.com> | 2010-06-03 01:04:36 +0000 |
---|---|---|
committer | Josh Blum <josh@joshknows.com> | 2010-06-03 01:04:36 +0000 |
commit | 565e46fa1038bebf59257a8b3eefae4487cf0946 (patch) | |
tree | bc5c9085824cc61bf5c5479d5c357e6ec37fdb58 /host/lib/transport | |
parent | 66deed6015f2a2bc67f17f6630ca62a41b596090 (diff) | |
parent | b2054a45d45ba85e30ff601159b18f5ebd15dd76 (diff) | |
download | uhd-565e46fa1038bebf59257a8b3eefae4487cf0946.tar.gz uhd-565e46fa1038bebf59257a8b3eefae4487cf0946.tar.bz2 uhd-565e46fa1038bebf59257a8b3eefae4487cf0946.zip |
Merge branch 'master' of ettus.sourcerepo.com:ettus/uhdpriv into usrp_e
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 195 | ||||
-rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 27 | ||||
-rw-r--r-- | host/lib/transport/zero_copy.cpp | 139 |
4 files changed, 235 insertions, 127 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index ed8c35225..a74f7d527 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -50,4 +50,5 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_SOURCE_DIR}/lib/transport/udp_simple.cpp ${CMAKE_SOURCE_DIR}/lib/transport/udp_zero_copy_asio.cpp ${CMAKE_SOURCE_DIR}/lib/transport/vrt_packet_handler.hpp + ${CMAKE_SOURCE_DIR}/lib/transport/zero_copy.cpp ) diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index f8a222475..ced606777 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -28,58 +28,8 @@ using namespace uhd::transport; * Constants **********************************************************************/ static const size_t MIN_SOCK_BUFF_SIZE = size_t(100e3); -static const size_t MAX_DGRAM_SIZE = 2048; //assume max size on send and recv -static const double RECV_TIMEOUT = 0.1; // 100 ms - -/*********************************************************************** - * Managed receive buffer implementation for udp zero-copy asio: - **********************************************************************/ -class managed_recv_buffer_impl : public managed_recv_buffer{ -public: - managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){ - /* NOP */ - } - - ~managed_recv_buffer_impl(void){ - delete [] this->cast<const boost::uint8_t *>(); - } - -private: - const boost::asio::const_buffer &get(void) const{ - return _buff; - } - - const boost::asio::const_buffer _buff; -}; - -/*********************************************************************** - * Managed send buffer implementation for udp zero-copy asio: - **********************************************************************/ -class managed_send_buffer_impl : public managed_send_buffer{ -public: - managed_send_buffer_impl( - const boost::asio::mutable_buffer &buff, - boost::asio::ip::udp::socket *socket - ) : _buff(buff), _socket(socket){ - /* NOP */ - } - - ~managed_send_buffer_impl(void){ - /* NOP */ - } - - void commit(size_t num_bytes){ - _socket->send(boost::asio::buffer(_buff, num_bytes)); - } - -private: - const boost::asio::mutable_buffer &get(void) const{ - return _buff; - } - - const boost::asio::mutable_buffer _buff; - boost::asio::ip::udp::socket *_socket; -}; +static const size_t MAX_DGRAM_SIZE = 1500; //assume max size on send and recv +static const double RECV_TIMEOUT = 0.1; //100 ms /*********************************************************************** * Zero Copy UDP implementation with ASIO: @@ -88,95 +38,108 @@ private: * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class udp_zero_copy_impl : public udp_zero_copy{ +class udp_zero_copy_impl: + public phony_zero_copy_recv_if, + public phony_zero_copy_send_if, + public udp_zero_copy +{ public: typedef boost::shared_ptr<udp_zero_copy_impl> sptr; - //structors - udp_zero_copy_impl(const std::string &addr, const std::string &port); - ~udp_zero_copy_impl(void); + udp_zero_copy_impl( + const std::string &addr, + const std::string &port + ): + phony_zero_copy_recv_if(MAX_DGRAM_SIZE), + phony_zero_copy_send_if(MAX_DGRAM_SIZE) + { + //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; + + // resolve the address + boost::asio::ip::udp::resolver resolver(_io_service); + boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); + boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + + // create, open, and connect the socket + _socket = new boost::asio::ip::udp::socket(_io_service); + _socket->open(boost::asio::ip::udp::v4()); + _socket->connect(receiver_endpoint); + _sock_fd = _socket->native(); + } - //send/recv - managed_recv_buffer::sptr get_recv_buff(void); - managed_send_buffer::sptr get_send_buff(void); + ~udp_zero_copy_impl(void){ + delete _socket; + } - //manage buffer - template <typename Opt> size_t get_buff_size(void){ + //get size for internal socket buffer + template <typename Opt> size_t get_buff_size(void) const{ Opt option; _socket->get_option(option); return option.value(); } + //set size for internal socket buffer template <typename Opt> size_t resize_buff(size_t num_bytes){ Opt option(num_bytes); _socket->set_option(option); return get_buff_size<Opt>(); } -private: - boost::asio::ip::udp::socket *_socket; - boost::asio::io_service _io_service; - - //send and recv buffer memory (allocated once) - boost::uint8_t _send_mem[MIN_SOCK_BUFF_SIZE]; - - managed_send_buffer::sptr _send_buff; -}; -udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){ - //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; - - // resolve the address - boost::asio::ip::udp::resolver resolver(_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); - boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - - // create, open, and connect the socket - _socket = new boost::asio::ip::udp::socket(_io_service); - _socket->open(boost::asio::ip::udp::v4()); - _socket->connect(receiver_endpoint); - - // create the managed send buff (just once) - _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( - boost::asio::buffer(_send_mem, MIN_SOCK_BUFF_SIZE), _socket - )); - - // set recv timeout - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = size_t(RECV_TIMEOUT*1e6); - UHD_ASSERT_THROW(setsockopt( - _socket->native(), - SOL_SOCKET, SO_RCVTIMEO, - (const char *)&tv, sizeof(timeval) - ) == 0); -} + //The number of frames is approximately the buffer size divided by the max datagram size. + //In reality, this is a phony zero-copy interface and the number of frames is infinite. + //However, its sensible to advertise a frame count that is approximate to buffer size. + //This way, the transport caller will have an idea about how much buffering to create. -udp_zero_copy_impl::~udp_zero_copy_impl(void){ - delete _socket; -} - -managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){ - //allocate memory - boost::uint8_t *recv_mem = new boost::uint8_t[MAX_DGRAM_SIZE]; + size_t get_num_recv_frames(void) const{ + return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/MAX_DGRAM_SIZE; + } - //call recv() with timeout option - size_t num_bytes = _socket->receive(boost::asio::buffer(recv_mem, MIN_SOCK_BUFF_SIZE)); + size_t get_num_send_frames(void) const{ + return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/MAX_DGRAM_SIZE; + } - //create a new managed buffer to house the data - return managed_recv_buffer::sptr( - new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes)) - ); -} +private: + boost::asio::ip::udp::socket *_socket; + boost::asio::io_service _io_service; + int _sock_fd; + + size_t recv(const boost::asio::mutable_buffer &buff){ + //setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = int(RECV_TIMEOUT*1e6); + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(_sock_fd, &rset); + + //call select to perform timed wait + if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) return 0; + + return ::recv( + _sock_fd, + boost::asio::buffer_cast<char *>(buff), + boost::asio::buffer_size(buff), + 0 + ); + } -managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){ - return _send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these -} + size_t send(const boost::asio::const_buffer &buff){ + return ::send( + _sock_fd, + boost::asio::buffer_cast<const char *>(buff), + boost::asio::buffer_size(buff), + 0 + ); + } +}; /*********************************************************************** * UDP zero copy make function **********************************************************************/ -template<typename Opt> static inline void resize_buff_helper( +template<typename Opt> static void resize_buff_helper( udp_zero_copy_impl::sptr udp_trans, size_t target_size, const std::string &name diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index e64e3383d..d6b863040 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -54,6 +54,8 @@ namespace vrt_packet_handler{ } }; + typedef boost::function<uhd::transport::managed_recv_buffer::sptr(void)> get_recv_buff_t; + typedef boost::function<void(uhd::transport::managed_recv_buffer::sptr)> recv_cb_t; static UHD_INLINE void recv_cb_nop(uhd::transport::managed_recv_buffer::sptr){ @@ -112,15 +114,16 @@ namespace vrt_packet_handler{ const uhd::io_type_t &io_type, const uhd::otw_type_t &otw_type, double tick_rate, - uhd::transport::zero_copy_if::sptr zc_iface, + const get_recv_buff_t &get_recv_buff, //use these two params to handle a layer above vrt size_t vrt_header_offset_words32, - const recv_cb_t& recv_cb + const recv_cb_t &recv_cb ){ //perform a receive if no rx data is waiting to be copied if (boost::asio::buffer_size(state.copy_buff) == 0){ state.fragment_offset_in_samps = 0; - state.managed_buff = zc_iface->get_recv_buff(); + state.managed_buff = get_recv_buff(); + if (state.managed_buff.get() == NULL) return 0; recv_cb(state.managed_buff); //callback before vrt unpack try{ _recv1_helper( @@ -169,7 +172,7 @@ namespace vrt_packet_handler{ const uhd::io_type_t &io_type, const uhd::otw_type_t &otw_type, double tick_rate, - uhd::transport::zero_copy_if::sptr zc_iface, + const get_recv_buff_t &get_recv_buff, //use these two params to handle a layer above vrt size_t vrt_header_offset_words32 = 0, const recv_cb_t& recv_cb = &recv_cb_nop @@ -189,7 +192,7 @@ namespace vrt_packet_handler{ metadata, io_type, otw_type, tick_rate, - zc_iface, + get_recv_buff, vrt_header_offset_words32, recv_cb ); @@ -208,7 +211,7 @@ namespace vrt_packet_handler{ (accum_num_samps == 0)? metadata : tmp_md, //only the first metadata gets kept io_type, otw_type, tick_rate, - zc_iface, + get_recv_buff, vrt_header_offset_words32, recv_cb ); @@ -234,6 +237,8 @@ namespace vrt_packet_handler{ } }; + typedef boost::function<uhd::transport::managed_send_buffer::sptr(void)> get_send_buff_t; + typedef boost::function<void(uhd::transport::managed_send_buffer::sptr)> send_cb_t; static UHD_INLINE void send_cb_nop(uhd::transport::managed_send_buffer::sptr){ @@ -252,12 +257,12 @@ namespace vrt_packet_handler{ const uhd::io_type_t &io_type, const uhd::otw_type_t &otw_type, double tick_rate, - uhd::transport::zero_copy_if::sptr zc_iface, + const get_send_buff_t &get_send_buff, size_t vrt_header_offset_words32, const send_cb_t& send_cb ){ //get a new managed send buffer - uhd::transport::managed_send_buffer::sptr send_buff = zc_iface->get_send_buff(); + uhd::transport::managed_send_buffer::sptr send_buff = get_send_buff(); boost::uint32_t *tx_mem = send_buff->cast<boost::uint32_t *>() + vrt_header_offset_words32; size_t num_header_words32, num_packet_words32; @@ -298,7 +303,7 @@ namespace vrt_packet_handler{ const uhd::io_type_t &io_type, const uhd::otw_type_t &otw_type, double tick_rate, - uhd::transport::zero_copy_if::sptr zc_iface, + const get_send_buff_t &get_send_buff, size_t max_samples_per_packet, //use these two params to handle a layer above vrt size_t vrt_header_offset_words32 = 0, @@ -319,7 +324,7 @@ namespace vrt_packet_handler{ metadata, io_type, otw_type, tick_rate, - zc_iface, + get_send_buff, vrt_header_offset_words32, send_cb ); @@ -353,7 +358,7 @@ namespace vrt_packet_handler{ md, io_type, otw_type, tick_rate, - zc_iface, + get_send_buff, vrt_header_offset_words32, send_cb ); diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp new file mode 100644 index 000000000..27f41329b --- /dev/null +++ b/host/lib/transport/zero_copy.cpp @@ -0,0 +1,139 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/zero_copy.hpp> +#include <boost/cstdint.hpp> +#include <boost/function.hpp> +#include <boost/bind.hpp> + +using namespace uhd::transport; + +/*********************************************************************** + * The pure-virtual deconstructor needs an implementation to be happy + **********************************************************************/ +managed_recv_buffer::~managed_recv_buffer(void){ + /* NOP */ +} + +/*********************************************************************** + * Phony zero-copy recv interface implementation + **********************************************************************/ + +//! phony zero-copy recv buffer implementation +class managed_recv_buffer_impl : public managed_recv_buffer{ +public: + managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){ + /* NOP */ + } + + ~managed_recv_buffer_impl(void){ + delete [] this->cast<const boost::uint8_t *>(); + } + +private: + const boost::asio::const_buffer &get(void) const{ + return _buff; + } + + const boost::asio::const_buffer _buff; +}; + +//! phony zero-copy recv interface implementation +struct phony_zero_copy_recv_if::impl{ + size_t max_buff_size; +}; + +phony_zero_copy_recv_if::phony_zero_copy_recv_if(size_t max_buff_size){ + _impl = UHD_PIMPL_MAKE(impl, ()); + _impl->max_buff_size = max_buff_size; +} + +phony_zero_copy_recv_if::~phony_zero_copy_recv_if(void){ + /* NOP */ +} + +managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(void){ + //allocate memory + boost::uint8_t *recv_mem = new boost::uint8_t[_impl->max_buff_size]; + + //call recv() with timeout option + size_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size)); + + //create a new managed buffer to house the data + return managed_recv_buffer::sptr( + new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes)) + ); +} + +/*********************************************************************** + * Phony zero-copy send interface implementation + **********************************************************************/ + +//! phony zero-copy send buffer implementation +class managed_send_buffer_impl : public managed_send_buffer{ +public: + typedef boost::function<size_t(const boost::asio::const_buffer &)> send_fcn_t; + + managed_send_buffer_impl( + const boost::asio::mutable_buffer &buff, + const send_fcn_t &send_fcn + ): + _buff(buff), + _send_fcn(send_fcn) + { + /* NOP */ + } + + ~managed_send_buffer_impl(void){ + /* NOP */ + } + + void commit(size_t num_bytes){ + _send_fcn(boost::asio::buffer(_buff, num_bytes)); + } + +private: + const boost::asio::mutable_buffer &get(void) const{ + return _buff; + } + + const boost::asio::mutable_buffer _buff; + const send_fcn_t _send_fcn; +}; + +//! phony zero-copy send interface implementation +struct phony_zero_copy_send_if::impl{ + boost::uint8_t *send_mem; + managed_send_buffer::sptr send_buff; +}; + +phony_zero_copy_send_if::phony_zero_copy_send_if(size_t max_buff_size){ + _impl = UHD_PIMPL_MAKE(impl, ()); + _impl->send_mem = new boost::uint8_t[max_buff_size]; + _impl->send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( + boost::asio::buffer(_impl->send_mem, max_buff_size), + boost::bind(&phony_zero_copy_send_if::send, this, _1) + )); +} + +phony_zero_copy_send_if::~phony_zero_copy_send_if(void){ + delete [] _impl->send_mem; +} + +managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(void){ + return _impl->send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these +} |