aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2011-02-05 00:05:30 -0800
committerJosh Blum <josh@joshknows.com>2011-02-05 00:05:30 -0800
commit05c2b1b544b76babbdb0210ae6a8d5de19e247bb (patch)
tree98cbe5d5ecc3146cd0fae0c2f6a90b97f0bd8c8d /host
parent6c145193a955cfd6295b11bed3cc7cc9828feb9c (diff)
downloaduhd-05c2b1b544b76babbdb0210ae6a8d5de19e247bb.tar.gz
uhd-05c2b1b544b76babbdb0210ae6a8d5de19e247bb.tar.bz2
uhd-05c2b1b544b76babbdb0210ae6a8d5de19e247bb.zip
udp: simplfy zero copy asio overhead with less shared_from_this, and timed waits when not needed
Diffstat (limited to 'host')
-rw-r--r--host/include/uhd/transport/bounded_buffer.hpp9
-rw-r--r--host/include/uhd/transport/bounded_buffer.ipp11
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp44
3 files changed, 43 insertions, 21 deletions
diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp
index aca93b071..33ded8cfd 100644
--- a/host/include/uhd/transport/bounded_buffer.hpp
+++ b/host/include/uhd/transport/bounded_buffer.hpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010 Ettus Research LLC
+// Copyright 2010-2011 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -66,6 +66,13 @@ namespace uhd{ namespace transport{
virtual bool push_with_timed_wait(const elem_type &elem, double timeout) = 0;
/*!
+ * Pop an element from the bounded_buffer immediately.
+ * \param elem the element reference pop to
+ * \return false when the bounded_buffer is empty
+ */
+ virtual bool pop_with_haste(elem_type &elem) = 0;
+
+ /*!
* Pop an element from the bounded_buffer.
* Wait until the bounded_buffer becomes non-empty.
* \param elem the element reference pop to
diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp
index 4fbe3f085..7bf182ee3 100644
--- a/host/include/uhd/transport/bounded_buffer.ipp
+++ b/host/include/uhd/transport/bounded_buffer.ipp
@@ -1,5 +1,5 @@
//
-// Copyright 2010 Ettus Research LLC
+// Copyright 2010-2011 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -72,6 +72,15 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
return true;
}
+ UHD_INLINE bool pop_with_haste(elem_type &elem){
+ boost::mutex::scoped_lock lock(_mutex);
+ if(_buffer.empty()) return false;
+ elem = this->pop_back();
+ lock.unlock();
+ _full_cond.notify_one();
+ return true;
+ }
+
UHD_INLINE void pop_with_wait(elem_type &elem){
boost::mutex::scoped_lock lock(_mutex);
_empty_cond.wait(lock, _not_empty_fcn);
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 697e172cd..f083b97d8 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -34,7 +34,7 @@ namespace asio = boost::asio;
//Define this to the the boost async io calls to perform receive.
//Otherwise, get_recv_buff uses a blocking receive with timeout.
-#define USE_ASIO_ASYNC_RECV
+//#define USE_ASIO_ASYNC_RECV
//Define this to the the boost async io calls to perform send.
//Otherwise, the commit callback uses a blocking send.
@@ -138,8 +138,7 @@ public:
//! handle a recv callback -> push the filled memory into the fifo
UHD_INLINE void handle_recv(void *mem, size_t len){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
+ _pending_recv_buffs->push_with_pop_on_full(boost::asio::buffer(mem, len));
}
////////////////////////////////////////////////////////////////////
@@ -152,8 +151,7 @@ public:
if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){
return managed_recv_buffer::make_safe(
buff, boost::bind(
- &udp_zero_copy_asio_impl::release,
- shared_from_this(),
+ &udp_zero_copy_asio_impl::release, this,
asio::buffer_cast<void*>(buff)
)
);
@@ -177,7 +175,6 @@ public:
#else /*USE_ASIO_ASYNC_RECV*/
////////////////////////////////////////////////////////////////////
managed_recv_buffer::sptr get_recv_buff(double timeout){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
asio::mutable_buffer buff;
//setup timeval for timeout
@@ -190,11 +187,11 @@ public:
FD_ZERO(&rset);
FD_SET(_sock_fd, &rset);
- //call select to perform timed wait and grab an available buffer with wait
+ //call select to perform timed wait and grab an available buffer now
//if the condition is true, call receive and return the managed buffer
if (
- ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and
- _pending_recv_buffs->pop_with_timed_wait(buff, timeout)
+ ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0
+ and _pending_recv_buffs->pop_with_haste(buff)
){
return managed_recv_buffer::make_safe(
asio::buffer(
@@ -202,8 +199,7 @@ public:
_socket->receive(asio::buffer(buff))
),
boost::bind(
- &udp_zero_copy_asio_impl::release,
- shared_from_this(),
+ &udp_zero_copy_asio_impl::release, this,
asio::buffer_cast<void*>(buff)
)
);
@@ -212,7 +208,6 @@ public:
}
void release(void *mem){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
handle_recv(mem, this->get_recv_frame_size());
}
@@ -225,10 +220,12 @@ public:
//! handle a send callback -> push the emptied memory into the fifo
UHD_INLINE void handle_send(void *mem){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size()));
+ _pending_send_buffs->push_with_pop_on_full(boost::asio::buffer(mem, this->get_send_frame_size()));
}
+ ////////////////////////////////////////////////////////////////////
+ #ifdef USE_ASIO_ASYNC_SEND
+ ////////////////////////////////////////////////////////////////////
//! pop an empty send buffer off of the fifo and bind with the commit callback
managed_send_buffer::sptr get_send_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -236,8 +233,7 @@ public:
if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){
return managed_send_buffer::make_safe(
buff, boost::bind(
- &udp_zero_copy_asio_impl::commit,
- shared_from_this(),
+ &udp_zero_copy_asio_impl::commit, this,
asio::buffer_cast<void*>(buff), _1
)
);
@@ -245,9 +241,6 @@ public:
return managed_send_buffer::sptr();
}
- ////////////////////////////////////////////////////////////////////
- #ifdef USE_ASIO_ASYNC_SEND
- ////////////////////////////////////////////////////////////////////
//! commit a send buffer -> start an async send on the buffer
void commit(void *mem, size_t len){
_socket->async_send(
@@ -262,6 +255,19 @@ public:
////////////////////////////////////////////////////////////////////
#else /*USE_ASIO_ASYNC_SEND*/
////////////////////////////////////////////////////////////////////
+ managed_send_buffer::sptr get_send_buff(double){
+ asio::mutable_buffer buff;
+ if (_pending_send_buffs->pop_with_haste(buff)){
+ return managed_send_buffer::make_safe(
+ buff, boost::bind(
+ &udp_zero_copy_asio_impl::commit, this,
+ asio::buffer_cast<void*>(buff), _1
+ )
+ );
+ }
+ return managed_send_buffer::sptr();
+ }
+
void commit(void *mem, size_t len){
_socket->send(asio::buffer(mem, len));
handle_send(mem);