summaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp195
-rw-r--r--host/lib/transport/vrt_packet_handler.hpp27
-rw-r--r--host/lib/transport/zero_copy.cpp139
4 files changed, 235 insertions, 127 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index ed8c35225..a74f7d527 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -50,4 +50,5 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_SOURCE_DIR}/lib/transport/udp_simple.cpp
${CMAKE_SOURCE_DIR}/lib/transport/udp_zero_copy_asio.cpp
${CMAKE_SOURCE_DIR}/lib/transport/vrt_packet_handler.hpp
+ ${CMAKE_SOURCE_DIR}/lib/transport/zero_copy.cpp
)
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index f8a222475..ced606777 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -28,58 +28,8 @@ using namespace uhd::transport;
* Constants
**********************************************************************/
static const size_t MIN_SOCK_BUFF_SIZE = size_t(100e3);
-static const size_t MAX_DGRAM_SIZE = 2048; //assume max size on send and recv
-static const double RECV_TIMEOUT = 0.1; // 100 ms
-
-/***********************************************************************
- * Managed receive buffer implementation for udp zero-copy asio:
- **********************************************************************/
-class managed_recv_buffer_impl : public managed_recv_buffer{
-public:
- managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){
- /* NOP */
- }
-
- ~managed_recv_buffer_impl(void){
- delete [] this->cast<const boost::uint8_t *>();
- }
-
-private:
- const boost::asio::const_buffer &get(void) const{
- return _buff;
- }
-
- const boost::asio::const_buffer _buff;
-};
-
-/***********************************************************************
- * Managed send buffer implementation for udp zero-copy asio:
- **********************************************************************/
-class managed_send_buffer_impl : public managed_send_buffer{
-public:
- managed_send_buffer_impl(
- const boost::asio::mutable_buffer &buff,
- boost::asio::ip::udp::socket *socket
- ) : _buff(buff), _socket(socket){
- /* NOP */
- }
-
- ~managed_send_buffer_impl(void){
- /* NOP */
- }
-
- void commit(size_t num_bytes){
- _socket->send(boost::asio::buffer(_buff, num_bytes));
- }
-
-private:
- const boost::asio::mutable_buffer &get(void) const{
- return _buff;
- }
-
- const boost::asio::mutable_buffer _buff;
- boost::asio::ip::udp::socket *_socket;
-};
+static const size_t MAX_DGRAM_SIZE = 1500; //assume max size on send and recv
+static const double RECV_TIMEOUT = 0.1; //100 ms
/***********************************************************************
* Zero Copy UDP implementation with ASIO:
@@ -88,95 +38,108 @@ private:
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
-class udp_zero_copy_impl : public udp_zero_copy{
+class udp_zero_copy_impl:
+ public phony_zero_copy_recv_if,
+ public phony_zero_copy_send_if,
+ public udp_zero_copy
+{
public:
typedef boost::shared_ptr<udp_zero_copy_impl> sptr;
- //structors
- udp_zero_copy_impl(const std::string &addr, const std::string &port);
- ~udp_zero_copy_impl(void);
+ udp_zero_copy_impl(
+ const std::string &addr,
+ const std::string &port
+ ):
+ phony_zero_copy_recv_if(MAX_DGRAM_SIZE),
+ phony_zero_copy_send_if(MAX_DGRAM_SIZE)
+ {
+ //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
+
+ // resolve the address
+ boost::asio::ip::udp::resolver resolver(_io_service);
+ boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
+ boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
+
+ // create, open, and connect the socket
+ _socket = new boost::asio::ip::udp::socket(_io_service);
+ _socket->open(boost::asio::ip::udp::v4());
+ _socket->connect(receiver_endpoint);
+ _sock_fd = _socket->native();
+ }
- //send/recv
- managed_recv_buffer::sptr get_recv_buff(void);
- managed_send_buffer::sptr get_send_buff(void);
+ ~udp_zero_copy_impl(void){
+ delete _socket;
+ }
- //manage buffer
- template <typename Opt> size_t get_buff_size(void){
+ //get size for internal socket buffer
+ template <typename Opt> size_t get_buff_size(void) const{
Opt option;
_socket->get_option(option);
return option.value();
}
+ //set size for internal socket buffer
template <typename Opt> size_t resize_buff(size_t num_bytes){
Opt option(num_bytes);
_socket->set_option(option);
return get_buff_size<Opt>();
}
-private:
- boost::asio::ip::udp::socket *_socket;
- boost::asio::io_service _io_service;
-
- //send and recv buffer memory (allocated once)
- boost::uint8_t _send_mem[MIN_SOCK_BUFF_SIZE];
-
- managed_send_buffer::sptr _send_buff;
-};
-udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){
- //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
-
- // resolve the address
- boost::asio::ip::udp::resolver resolver(_io_service);
- boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
- boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
-
- // create, open, and connect the socket
- _socket = new boost::asio::ip::udp::socket(_io_service);
- _socket->open(boost::asio::ip::udp::v4());
- _socket->connect(receiver_endpoint);
-
- // create the managed send buff (just once)
- _send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl(
- boost::asio::buffer(_send_mem, MIN_SOCK_BUFF_SIZE), _socket
- ));
-
- // set recv timeout
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = size_t(RECV_TIMEOUT*1e6);
- UHD_ASSERT_THROW(setsockopt(
- _socket->native(),
- SOL_SOCKET, SO_RCVTIMEO,
- (const char *)&tv, sizeof(timeval)
- ) == 0);
-}
+ //The number of frames is approximately the buffer size divided by the max datagram size.
+ //In reality, this is a phony zero-copy interface and the number of frames is infinite.
+ //However, its sensible to advertise a frame count that is approximate to buffer size.
+ //This way, the transport caller will have an idea about how much buffering to create.
-udp_zero_copy_impl::~udp_zero_copy_impl(void){
- delete _socket;
-}
-
-managed_recv_buffer::sptr udp_zero_copy_impl::get_recv_buff(void){
- //allocate memory
- boost::uint8_t *recv_mem = new boost::uint8_t[MAX_DGRAM_SIZE];
+ size_t get_num_recv_frames(void) const{
+ return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/MAX_DGRAM_SIZE;
+ }
- //call recv() with timeout option
- size_t num_bytes = _socket->receive(boost::asio::buffer(recv_mem, MIN_SOCK_BUFF_SIZE));
+ size_t get_num_send_frames(void) const{
+ return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/MAX_DGRAM_SIZE;
+ }
- //create a new managed buffer to house the data
- return managed_recv_buffer::sptr(
- new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes))
- );
-}
+private:
+ boost::asio::ip::udp::socket *_socket;
+ boost::asio::io_service _io_service;
+ int _sock_fd;
+
+ size_t recv(const boost::asio::mutable_buffer &buff){
+ //setup timeval for timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = int(RECV_TIMEOUT*1e6);
+
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(_sock_fd, &rset);
+
+ //call select to perform timed wait
+ if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) return 0;
+
+ return ::recv(
+ _sock_fd,
+ boost::asio::buffer_cast<char *>(buff),
+ boost::asio::buffer_size(buff),
+ 0
+ );
+ }
-managed_send_buffer::sptr udp_zero_copy_impl::get_send_buff(void){
- return _send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these
-}
+ size_t send(const boost::asio::const_buffer &buff){
+ return ::send(
+ _sock_fd,
+ boost::asio::buffer_cast<const char *>(buff),
+ boost::asio::buffer_size(buff),
+ 0
+ );
+ }
+};
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
-template<typename Opt> static inline void resize_buff_helper(
+template<typename Opt> static void resize_buff_helper(
udp_zero_copy_impl::sptr udp_trans,
size_t target_size,
const std::string &name
diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp
index e64e3383d..d6b863040 100644
--- a/host/lib/transport/vrt_packet_handler.hpp
+++ b/host/lib/transport/vrt_packet_handler.hpp
@@ -54,6 +54,8 @@ namespace vrt_packet_handler{
}
};
+ typedef boost::function<uhd::transport::managed_recv_buffer::sptr(void)> get_recv_buff_t;
+
typedef boost::function<void(uhd::transport::managed_recv_buffer::sptr)> recv_cb_t;
static UHD_INLINE void recv_cb_nop(uhd::transport::managed_recv_buffer::sptr){
@@ -112,15 +114,16 @@ namespace vrt_packet_handler{
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
- uhd::transport::zero_copy_if::sptr zc_iface,
+ const get_recv_buff_t &get_recv_buff,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32,
- const recv_cb_t& recv_cb
+ const recv_cb_t &recv_cb
){
//perform a receive if no rx data is waiting to be copied
if (boost::asio::buffer_size(state.copy_buff) == 0){
state.fragment_offset_in_samps = 0;
- state.managed_buff = zc_iface->get_recv_buff();
+ state.managed_buff = get_recv_buff();
+ if (state.managed_buff.get() == NULL) return 0;
recv_cb(state.managed_buff); //callback before vrt unpack
try{
_recv1_helper(
@@ -169,7 +172,7 @@ namespace vrt_packet_handler{
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
- uhd::transport::zero_copy_if::sptr zc_iface,
+ const get_recv_buff_t &get_recv_buff,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32 = 0,
const recv_cb_t& recv_cb = &recv_cb_nop
@@ -189,7 +192,7 @@ namespace vrt_packet_handler{
metadata,
io_type, otw_type,
tick_rate,
- zc_iface,
+ get_recv_buff,
vrt_header_offset_words32,
recv_cb
);
@@ -208,7 +211,7 @@ namespace vrt_packet_handler{
(accum_num_samps == 0)? metadata : tmp_md, //only the first metadata gets kept
io_type, otw_type,
tick_rate,
- zc_iface,
+ get_recv_buff,
vrt_header_offset_words32,
recv_cb
);
@@ -234,6 +237,8 @@ namespace vrt_packet_handler{
}
};
+ typedef boost::function<uhd::transport::managed_send_buffer::sptr(void)> get_send_buff_t;
+
typedef boost::function<void(uhd::transport::managed_send_buffer::sptr)> send_cb_t;
static UHD_INLINE void send_cb_nop(uhd::transport::managed_send_buffer::sptr){
@@ -252,12 +257,12 @@ namespace vrt_packet_handler{
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
- uhd::transport::zero_copy_if::sptr zc_iface,
+ const get_send_buff_t &get_send_buff,
size_t vrt_header_offset_words32,
const send_cb_t& send_cb
){
//get a new managed send buffer
- uhd::transport::managed_send_buffer::sptr send_buff = zc_iface->get_send_buff();
+ uhd::transport::managed_send_buffer::sptr send_buff = get_send_buff();
boost::uint32_t *tx_mem = send_buff->cast<boost::uint32_t *>() + vrt_header_offset_words32;
size_t num_header_words32, num_packet_words32;
@@ -298,7 +303,7 @@ namespace vrt_packet_handler{
const uhd::io_type_t &io_type,
const uhd::otw_type_t &otw_type,
double tick_rate,
- uhd::transport::zero_copy_if::sptr zc_iface,
+ const get_send_buff_t &get_send_buff,
size_t max_samples_per_packet,
//use these two params to handle a layer above vrt
size_t vrt_header_offset_words32 = 0,
@@ -319,7 +324,7 @@ namespace vrt_packet_handler{
metadata,
io_type, otw_type,
tick_rate,
- zc_iface,
+ get_send_buff,
vrt_header_offset_words32,
send_cb
);
@@ -353,7 +358,7 @@ namespace vrt_packet_handler{
md,
io_type, otw_type,
tick_rate,
- zc_iface,
+ get_send_buff,
vrt_header_offset_words32,
send_cb
);
diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp
new file mode 100644
index 000000000..27f41329b
--- /dev/null
+++ b/host/lib/transport/zero_copy.cpp
@@ -0,0 +1,139 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/cstdint.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+
+using namespace uhd::transport;
+
+/***********************************************************************
+ * The pure-virtual deconstructor needs an implementation to be happy
+ **********************************************************************/
+managed_recv_buffer::~managed_recv_buffer(void){
+ /* NOP */
+}
+
+/***********************************************************************
+ * Phony zero-copy recv interface implementation
+ **********************************************************************/
+
+//! phony zero-copy recv buffer implementation
+class managed_recv_buffer_impl : public managed_recv_buffer{
+public:
+ managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){
+ /* NOP */
+ }
+
+ ~managed_recv_buffer_impl(void){
+ delete [] this->cast<const boost::uint8_t *>();
+ }
+
+private:
+ const boost::asio::const_buffer &get(void) const{
+ return _buff;
+ }
+
+ const boost::asio::const_buffer _buff;
+};
+
+//! phony zero-copy recv interface implementation
+struct phony_zero_copy_recv_if::impl{
+ size_t max_buff_size;
+};
+
+phony_zero_copy_recv_if::phony_zero_copy_recv_if(size_t max_buff_size){
+ _impl = UHD_PIMPL_MAKE(impl, ());
+ _impl->max_buff_size = max_buff_size;
+}
+
+phony_zero_copy_recv_if::~phony_zero_copy_recv_if(void){
+ /* NOP */
+}
+
+managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(void){
+ //allocate memory
+ boost::uint8_t *recv_mem = new boost::uint8_t[_impl->max_buff_size];
+
+ //call recv() with timeout option
+ size_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size));
+
+ //create a new managed buffer to house the data
+ return managed_recv_buffer::sptr(
+ new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes))
+ );
+}
+
+/***********************************************************************
+ * Phony zero-copy send interface implementation
+ **********************************************************************/
+
+//! phony zero-copy send buffer implementation
+class managed_send_buffer_impl : public managed_send_buffer{
+public:
+ typedef boost::function<size_t(const boost::asio::const_buffer &)> send_fcn_t;
+
+ managed_send_buffer_impl(
+ const boost::asio::mutable_buffer &buff,
+ const send_fcn_t &send_fcn
+ ):
+ _buff(buff),
+ _send_fcn(send_fcn)
+ {
+ /* NOP */
+ }
+
+ ~managed_send_buffer_impl(void){
+ /* NOP */
+ }
+
+ void commit(size_t num_bytes){
+ _send_fcn(boost::asio::buffer(_buff, num_bytes));
+ }
+
+private:
+ const boost::asio::mutable_buffer &get(void) const{
+ return _buff;
+ }
+
+ const boost::asio::mutable_buffer _buff;
+ const send_fcn_t _send_fcn;
+};
+
+//! phony zero-copy send interface implementation
+struct phony_zero_copy_send_if::impl{
+ boost::uint8_t *send_mem;
+ managed_send_buffer::sptr send_buff;
+};
+
+phony_zero_copy_send_if::phony_zero_copy_send_if(size_t max_buff_size){
+ _impl = UHD_PIMPL_MAKE(impl, ());
+ _impl->send_mem = new boost::uint8_t[max_buff_size];
+ _impl->send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl(
+ boost::asio::buffer(_impl->send_mem, max_buff_size),
+ boost::bind(&phony_zero_copy_send_if::send, this, _1)
+ ));
+}
+
+phony_zero_copy_send_if::~phony_zero_copy_send_if(void){
+ delete [] _impl->send_mem;
+}
+
+managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(void){
+ return _impl->send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these
+}