aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport/udp_zero_copy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport/udp_zero_copy.cpp')
-rw-r--r--host/lib/transport/udp_zero_copy.cpp153
1 files changed, 72 insertions, 81 deletions
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp
index 0ccc92b82..166177177 100644
--- a/host/lib/transport/udp_zero_copy.cpp
+++ b/host/lib/transport/udp_zero_copy.cpp
@@ -1,5 +1,5 @@
//
-// Copyright 2010-2011 Ettus Research LLC
+// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -18,12 +18,14 @@
#include "udp_common.hpp"
#include <uhd/transport/udp_zero_copy.hpp>
#include <uhd/transport/udp_simple.hpp> //mtu
-#include <uhd/transport/bounded_buffer.hpp>
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/utils/atomic.hpp>
#include <boost/format.hpp>
-#include <list>
+#include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp> //sleep
+#include <vector>
using namespace uhd;
using namespace uhd::transport;
@@ -61,66 +63,84 @@ static void check_registry_for_fast_send_threshold(const size_t mtu){
/***********************************************************************
* Reusable managed receiver buffer:
- * - Initialize with memory and a release callback.
- * - Call get new with a length in bytes to re-use.
+ * - get_new performs the recv operation
**********************************************************************/
class udp_zero_copy_asio_mrb : public managed_recv_buffer{
public:
- udp_zero_copy_asio_mrb(void *mem, bounded_buffer<udp_zero_copy_asio_mrb *> &pending):
- _mem(mem), _len(0), _pending(pending){/* NOP */}
+ udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size):
+ _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
void release(void){
- if (_len == 0) return;
- _pending.push_with_haste(this);
- _len = 0;
+ _claimer.release();
}
- sptr get_new(size_t len){
- _len = len;
- return make_managed_buffer(this);
- }
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ if (not _claimer.claim_with_wait(timeout)) return sptr();
- template <class T> T cast(void) const{return static_cast<T>(_mem);}
+ #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported
+ _len = ::recv(_sock_fd, (char *)_mem, _frame_size, MSG_DONTWAIT);
+ if (_len > 0){
+ index++; //advances the caller's buffer
+ return make(this, _mem, size_t(_len));
+ }
+ #endif
-private:
- const void *get_buff(void) const{return _mem;}
- size_t get_size(void) const{return _len;}
+ if (wait_for_recv_ready(_sock_fd, timeout)){
+ _len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0);
+ index++; //advances the caller's buffer
+ return make(this, _mem, size_t(_len));
+ }
+ _claimer.release(); //undo claim
+ return sptr(); //null for timeout
+ }
+
+private:
void *_mem;
- size_t _len;
- bounded_buffer<udp_zero_copy_asio_mrb *> &_pending;
+ int _sock_fd;
+ size_t _frame_size;
+ ssize_t _len;
+ simple_claimer _claimer;
};
/***********************************************************************
* Reusable managed send buffer:
- * - Initialize with memory and a commit callback.
- * - Call get new with a length in bytes to re-use.
+ * - commit performs the send operation
**********************************************************************/
class udp_zero_copy_asio_msb : public managed_send_buffer{
public:
- udp_zero_copy_asio_msb(void *mem, bounded_buffer<udp_zero_copy_asio_msb *> &pending, int sock_fd):
- _mem(mem), _len(0), _pending(pending), _sock_fd(sock_fd){/* NOP */}
-
- void commit(size_t len){
- if (_len == 0) return;
- ::send(_sock_fd, this->cast<const char *>(), len, 0);
- _pending.push_with_haste(this);
- _len = 0;
+ udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size):
+ _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
+
+ void release(void){
+ //Retry logic because send may fail with ENOBUFS.
+ //This is known to occur at least on some OSX systems.
+ //But it should be safe to always check for the error.
+ while (true)
+ {
+ const ssize_t ret = ::send(_sock_fd, (const char *)_mem, size(), 0);
+ if (ret == ssize_t(size())) break;
+ if (ret == -1 and errno == ENOBUFS)
+ {
+ boost::this_thread::sleep(boost::posix_time::microseconds(1));
+ continue; //try to send again
+ }
+ UHD_ASSERT_THROW(ret == ssize_t(size()));
+ }
+ _claimer.release();
}
- sptr get_new(size_t len){
- _len = len;
- return make_managed_buffer(this);
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ if (not _claimer.claim_with_wait(timeout)) return sptr();
+ index++; //advances the caller's buffer
+ return make(this, _mem, _frame_size);
}
private:
- void *get_buff(void) const{return _mem;}
- size_t get_size(void) const{return _len;}
-
void *_mem;
- size_t _len;
- bounded_buffer<udp_zero_copy_asio_msb *> &_pending;
int _sock_fd;
+ size_t _frame_size;
+ simple_claimer _claimer;
};
/***********************************************************************
@@ -145,8 +165,7 @@ public:
_num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
_recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
_send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
- _pending_recv_buffs(_num_recv_frames),
- _pending_send_buffs(_num_send_frames)
+ _next_recv_buff_index(0), _next_send_buff_index(0)
{
UHD_LOG << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
@@ -167,18 +186,16 @@ public:
//allocate re-usable managed receive buffers
for (size_t i = 0; i < get_num_recv_frames(); i++){
- _mrb_pool.push_back(udp_zero_copy_asio_mrb(
- _recv_buffer_pool->at(i), _pending_recv_buffs
+ _mrb_pool.push_back(boost::make_shared<udp_zero_copy_asio_mrb>(
+ _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size()
));
- _pending_recv_buffs.push_with_haste(&_mrb_pool.back());
}
//allocate re-usable managed send buffers
for (size_t i = 0; i < get_num_send_frames(); i++){
- _msb_pool.push_back(udp_zero_copy_asio_msb(
- _send_buffer_pool->at(i), _pending_send_buffs, _sock_fd
+ _msb_pool.push_back(boost::make_shared<udp_zero_copy_asio_msb>(
+ _send_buffer_pool->at(i), _sock_fd, get_send_frame_size()
));
- _pending_send_buffs.push_with_haste(&_msb_pool.back());
}
}
@@ -198,29 +215,11 @@ public:
/*******************************************************************
* Receive implementation:
- *
- * Perform a non-blocking receive for performance,
- * and then fall back to a blocking receive with timeout.
- * Return the managed receive buffer with the new length.
- * When the caller is finished with the managed buffer,
- * the managed receive buffer is released back into the queue.
+ * Block on the managed buffer's get call and advance the index.
******************************************************************/
managed_recv_buffer::sptr get_recv_buff(double timeout){
- udp_zero_copy_asio_mrb *mrb = NULL;
- if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
-
- #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported
- ssize_t ret = ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, MSG_DONTWAIT);
- if (ret > 0) return mrb->get_new(ret);
- #endif
-
- if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new(
- ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0)
- );
-
- _pending_recv_buffs.push_with_haste(mrb); //timeout: return the managed buffer to the queue
- }
- return managed_recv_buffer::sptr();
+ if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
}
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
@@ -228,18 +227,11 @@ public:
/*******************************************************************
* Send implementation:
- *
- * Get a managed receive buffer immediately with max length set.
- * The caller will fill the buffer and commit it when finished.
- * The commit routine will perform a blocking send operation,
- * and push the managed send buffer back into the queue.
+ * Block on the managed buffer's get call and advance the index.
******************************************************************/
managed_send_buffer::sptr get_send_buff(double timeout){
- udp_zero_copy_asio_msb *msb = NULL;
- if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){
- return msb->get_new(_send_frame_size);
- }
- return managed_send_buffer::sptr();
+ if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
+ return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
}
size_t get_num_send_frames(void) const {return _num_send_frames;}
@@ -250,10 +242,9 @@ private:
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
- bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs;
- bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs;
- std::list<udp_zero_copy_asio_msb> _msb_pool;
- std::list<udp_zero_copy_asio_mrb> _mrb_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool;
+ size_t _next_recv_buff_index, _next_send_buff_index;
//asio guts -> socket and service
asio::io_service _io_service;