aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2010-10-08 11:20:30 -0700
committerJosh Blum <josh@joshknows.com>2010-10-08 11:20:30 -0700
commite8e9258acb8ce8499c40de52abaa4e6ba83d479d (patch)
tree622fc5a2d23dd56e1228679b9dbf1b674881833f /host
parent201af62ccb4d1065a44822f0f3ce4c81755510b5 (diff)
downloaduhd-e8e9258acb8ce8499c40de52abaa4e6ba83d479d.tar.gz
uhd-e8e9258acb8ce8499c40de52abaa4e6ba83d479d.tar.bz2
uhd-e8e9258acb8ce8499c40de52abaa4e6ba83d479d.zip
udp: worked blocking send back into udp transport, enable async with #define
tweaked some code in bounded buffer
Diffstat (limited to 'host')
-rw-r--r--host/include/uhd/transport/alignment_buffer.ipp8
-rw-r--r--host/include/uhd/transport/bounded_buffer.ipp39
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp114
3 files changed, 99 insertions, 62 deletions
diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp
index 5f09de0d9..833b5d399 100644
--- a/host/include/uhd/transport/alignment_buffer.ipp
+++ b/host/include/uhd/transport/alignment_buffer.ipp
@@ -54,14 +54,14 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
UHD_INLINE bool pop_elems_with_timed_wait(
std::vector<elem_type> &elems, double timeout
){
- boost::system_time exit_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1e6));
+ boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout);
buff_contents_type buff_contents_tmp;
std::list<size_t> indexes_to_do(_all_indexes);
//do an initial pop to load an initial sequence id
size_t index = indexes_to_do.front();
if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()
+ buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
)) return false;
elems[index] = buff_contents_tmp.first;
seq_type expected_seq_id = buff_contents_tmp.second;
@@ -76,7 +76,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
indexes_to_do = _all_indexes;
index = indexes_to_do.front();
if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()
+ buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
)) return false;
elems[index] = buff_contents_tmp.first;
expected_seq_id = buff_contents_tmp.second;
@@ -86,7 +86,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
//pop an element off for this index
index = indexes_to_do.front();
if (not _buffs[index]->pop_with_timed_wait(
- buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()
+ buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())
)) return false;
//if the sequence id matches:
diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp
index 58f78bab4..edc7faa06 100644
--- a/host/include/uhd/transport/bounded_buffer.ipp
+++ b/host/include/uhd/transport/bounded_buffer.ipp
@@ -19,18 +19,28 @@
#define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_IPP
#include <boost/bind.hpp>
+#include <boost/function.hpp>
#include <boost/circular_buffer.hpp>
#include <boost/thread/condition.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
namespace uhd{ namespace transport{ namespace{ /*anon*/
+ static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){
+ return boost::posix_time::microseconds(long(timeout*1e6));
+ }
+
+ static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){
+ return 1e-6*time_dur.total_microseconds();
+ }
+
template <typename elem_type>
class bounded_buffer_impl : public bounded_buffer<elem_type>{
public:
bounded_buffer_impl(size_t capacity) : _buffer(capacity){
- /* NOP */
+ _not_full_fcn = boost::bind(&bounded_buffer_impl<elem_type>::not_full, this);
+ _not_empty_fcn = boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this);
}
UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){
@@ -52,17 +62,16 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
UHD_INLINE void push_with_wait(const elem_type &elem){
boost::unique_lock<boost::mutex> lock(_mutex);
- _full_cond.wait(lock, boost::bind(&bounded_buffer_impl<elem_type>::not_full, this));
+ _full_cond.wait(lock, _not_full_fcn);
_buffer.push_front(elem);
lock.unlock();
_empty_cond.notify_one();
}
- bool push_with_timed_wait(const elem_type &elem, double timeout){
+ UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout){
boost::unique_lock<boost::mutex> lock(_mutex);
if (not _full_cond.timed_wait(
- lock, boost::posix_time::microseconds(long(timeout*1e6)),
- boost::bind(&bounded_buffer_impl<elem_type>::not_full, this)
+ lock, to_time_dur(timeout), _not_full_fcn
)) return false;
_buffer.push_front(elem);
lock.unlock();
@@ -72,19 +81,18 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
UHD_INLINE void pop_with_wait(elem_type &elem){
boost::unique_lock<boost::mutex> lock(_mutex);
- _empty_cond.wait(lock, boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this));
- this->pop_back(elem);
+ _empty_cond.wait(lock, _not_empty_fcn);
+ elem = this->pop_back();
lock.unlock();
_full_cond.notify_one();
}
- bool pop_with_timed_wait(elem_type &elem, double timeout){
+ UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout){
boost::unique_lock<boost::mutex> lock(_mutex);
if (not _empty_cond.timed_wait(
- lock, boost::posix_time::microseconds(long(timeout*1e6)),
- boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this)
+ lock, to_time_dur(timeout), _not_empty_fcn
)) return false;
- this->pop_back(elem);
+ elem = this->pop_back();
lock.unlock();
_full_cond.notify_one();
return true;
@@ -92,7 +100,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
UHD_INLINE void clear(void){
boost::unique_lock<boost::mutex> lock(_mutex);
- while (not_empty()) _buffer.pop_back();
+ while (not_empty()) this->pop_back();
lock.unlock();
_full_cond.notify_one();
}
@@ -105,16 +113,19 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/
bool not_full(void) const{return not _buffer.full();}
bool not_empty(void) const{return not _buffer.empty();}
+ boost::function<bool(void)> _not_full_fcn, _not_empty_fcn;
+
/*!
* Three part operation to pop an element:
* 1) assign elem to the back element
* 2) assign the back element to empty
* 3) pop the back to move the counter
*/
- UHD_INLINE void pop_back(elem_type &elem){
- elem = _buffer.back();
+ UHD_INLINE elem_type pop_back(void){
+ elem_type elem = _buffer.back();
_buffer.back() = elem_type();
_buffer.pop_back();
+ return elem;
}
};
}}} //namespace
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index 40fe3fa0f..d84aeefdd 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -39,6 +39,10 @@ namespace asio = boost::asio;
//Otherwise, get_recv_buff uses a blocking receive with timeout.
//#define USE_ASIO_ASYNC_RECV
+//Define this to the the boost async io calls to perform send.
+//Otherwise, the commit callback uses a blocking send.
+//#define USE_ASIO_ASYNC_SEND
+
//enough buffering for half a second of samples at full rate on usrp2
static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5);
@@ -55,7 +59,13 @@ static const size_t DEFAULT_NUM_RECV_FRAMES = 32;
#else
static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu;
#endif
+//The non-async send only ever requires a single frame
+//because the buffer will be committed before a new get.
+#ifdef USE_ASIO_ASYNC_SEND
static const size_t DEFAULT_NUM_SEND_FRAMES = 32;
+#else
+static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu;;
+#endif
//a single concurrent thread for io_service seems to be the fastest
static const size_t CONCURRENCY_HINT = 1;
@@ -143,6 +153,12 @@ public:
return get_buff_size<Opt>();
}
+ //! handle a recv callback -> push the filled memory into the fifo
+ UHD_INLINE void handle_recv(void *mem, size_t len){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
+ }
+
////////////////////////////////////////////////////////////////////
#ifdef USE_ASIO_ASYNC_RECV
////////////////////////////////////////////////////////////////////
@@ -162,16 +178,10 @@ public:
return managed_recv_buffer::sptr();
}
- //! handle a recv callback -> push the filled memory into the fifo
- void handle_recv(void *mem, size_t len){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len));
- }
-
//! release a recv buffer -> start an async recv on the buffer
void release(void *mem){
_socket->async_receive(
- boost::asio::buffer(mem, _recv_frame_size),
+ boost::asio::buffer(mem, this->get_recv_frame_size()),
boost::bind(
&udp_zero_copy_asio_impl::handle_recv,
shared_from_this(), mem,
@@ -185,6 +195,7 @@ public:
////////////////////////////////////////////////////////////////////
managed_recv_buffer::sptr get_recv_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
+ asio::mutable_buffer buff;
//setup timeval for timeout
timeval tv;
@@ -196,34 +207,30 @@ public:
FD_ZERO(&rset);
FD_SET(_sock_fd, &rset);
- //call select to perform timed wait
- if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0)
- return managed_recv_buffer::sptr();
-
- //grab an available buffer
- asio::mutable_buffer buff;
- if (not _pending_recv_buffs->pop_with_timed_wait(buff, timeout))
- return managed_recv_buffer::sptr();
-
- //receive and return the buffer
- return managed_recv_buffer::make_safe(
- asio::buffer(
- boost::asio::buffer_cast<void *>(buff),
- _socket->receive(boost::asio::buffer(buff))
- ),
- boost::bind(
- &udp_zero_copy_asio_impl::release,
- shared_from_this(),
- asio::buffer_cast<void*>(buff)
- )
- );
+ //call select to perform timed wait and grab an available buffer with wait
+ //if the condition is true, call receive and return the managed buffer
+ if (
+ ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and
+ _pending_recv_buffs->pop_with_timed_wait(buff, timeout)
+ ){
+ return managed_recv_buffer::make_safe(
+ asio::buffer(
+ boost::asio::buffer_cast<void *>(buff),
+ _socket->receive(asio::buffer(buff))
+ ),
+ boost::bind(
+ &udp_zero_copy_asio_impl::release,
+ shared_from_this(),
+ asio::buffer_cast<void*>(buff)
+ )
+ );
+ }
+ return managed_recv_buffer::sptr();
}
void release(void *mem){
boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_recv_buffs->push_with_wait(
- boost::asio::buffer(mem, this->get_recv_frame_size())
- );
+ handle_recv(mem, this->get_recv_frame_size());
}
////////////////////////////////////////////////////////////////////
@@ -233,6 +240,12 @@ public:
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
size_t get_recv_frame_size(void) const {return _recv_frame_size;}
+ //! handle a send callback -> push the emptied memory into the fifo
+ UHD_INLINE void handle_send(void *mem){
+ boost::this_thread::disable_interruption di; //disable because the wait can throw
+ _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size()));
+ }
+
//! pop an empty send buffer off of the fifo and bind with the commit callback
managed_send_buffer::sptr get_send_buff(double timeout){
boost::this_thread::disable_interruption di; //disable because the wait can throw
@@ -249,12 +262,9 @@ public:
return managed_send_buffer::sptr();
}
- //! handle a send callback -> push the emptied memory into the fifo
- void handle_send(void *mem){
- boost::this_thread::disable_interruption di; //disable because the wait can throw
- _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size));
- }
-
+ ////////////////////////////////////////////////////////////////////
+ #ifdef USE_ASIO_ASYNC_SEND
+ ////////////////////////////////////////////////////////////////////
//! commit a send buffer -> start an async send on the buffer
void commit(void *mem, size_t len){
_socket->async_send(
@@ -266,6 +276,18 @@ public:
);
}
+ ////////////////////////////////////////////////////////////////////
+ #else /*USE_ASIO_ASYNC_SEND*/
+ ////////////////////////////////////////////////////////////////////
+ void commit(void *mem, size_t len){
+ _socket->send(asio::buffer(mem, len));
+ handle_send(mem);
+ }
+
+ ////////////////////////////////////////////////////////////////////
+ #endif /*USE_ASIO_ASYNC_SEND*/
+ ////////////////////////////////////////////////////////////////////
+
size_t get_num_send_frames(void) const {return _num_send_frames;}
size_t get_send_frame_size(void) const {return _send_frame_size;}
@@ -297,6 +319,13 @@ template<typename Opt> static void resize_buff_helper(
if (name == "recv") min_sock_buff_size = MIN_RECV_SOCK_BUFF_SIZE;
if (name == "send") min_sock_buff_size = MIN_SEND_SOCK_BUFF_SIZE;
+ std::string help_message;
+ #if defined(UHD_PLATFORM_LINUX)
+ help_message = str(boost::format(
+ "Please run: sudo sysctl -w net.core.%smem_max=%d\n"
+ ) % ((name == "recv")?"r":"w") % min_sock_buff_size);
+ #endif /*defined(UHD_PLATFORM_LINUX)*/
+
//resize the buffer if size was provided
if (target_size > 0){
size_t actual_size = udp_trans->resize_buff<Opt>(target_size);
@@ -308,13 +337,10 @@ template<typename Opt> static void resize_buff_helper(
"Current %s sock buff size: %d bytes"
) % name % actual_size << std::endl;
if (actual_size < target_size) uhd::print_warning(str(boost::format(
- "The %1% buffer is smaller than the requested size.\n"
- "The minimum recommended buffer size is %2% bytes.\n"
- "See the transport application notes on buffer resizing.\n"
- #if defined(UHD_PLATFORM_LINUX)
- "Please run: sudo sysctl -w net.core.rmem_max=%2%\n"
- #endif /*defined(UHD_PLATFORM_LINUX)*/
- ) % name % min_sock_buff_size));
+ "The %s buffer is smaller than the requested size.\n"
+ "The minimum recommended buffer size is %d bytes.\n"
+ "See the transport application notes on buffer resizing.\n%s"
+ ) % name % min_sock_buff_size % help_message));
}
//only enable on platforms that are happy with the large buffer resize