//
// Copyright 2010-2011 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//
#include "udp_common.hpp"
#include
#include //mtu
#include
#include
#include
#include
#include
#include
using namespace uhd;
using namespace uhd::transport;
namespace asio = boost::asio;
//A reasonable number of frames for send/recv and async/sync
static const size_t DEFAULT_NUM_FRAMES = 32;
/***********************************************************************
* Reusable managed receiver buffer:
* - Initialize with memory and a release callback.
* - Call get new with a length in bytes to re-use.
**********************************************************************/
class udp_zero_copy_asio_mrb : public managed_recv_buffer{
public:
typedef boost::function release_cb_type;
udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb):
_mem(mem), _len(0), _release_cb(release_cb){/* NOP */}
void release(void){
if (_len == 0) return;
this->_release_cb(this);
_len = 0;
}
sptr get_new(size_t len){
_len = len;
return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter);
}
template T cast(void) const{return static_cast(_mem);}
private:
static void fake_deleter(void *obj){
static_cast(obj)->release();
}
const void *get_buff(void) const{return _mem;}
size_t get_size(void) const{return _len;}
void *_mem;
size_t _len;
release_cb_type _release_cb;
};
/***********************************************************************
* Reusable managed send buffer:
* - Initialize with memory and a commit callback.
* - Call get new with a length in bytes to re-use.
**********************************************************************/
class udp_zero_copy_asio_msb : public managed_send_buffer{
public:
typedef boost::function commit_cb_type;
udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb):
_mem(mem), _len(0), _commit_cb(commit_cb){/* NOP */}
void commit(size_t len){
if (_len == 0) return;
this->_commit_cb(this, len);
_len = 0;
}
sptr get_new(size_t len){
_len = len;
return sptr(this, &udp_zero_copy_asio_msb::fake_deleter);
}
private:
static void fake_deleter(void *obj){
static_cast(obj)->commit(0);
}
void *get_buff(void) const{return _mem;}
size_t get_size(void) const{return _len;}
void *_mem;
size_t _len;
commit_cb_type _commit_cb;
};
/***********************************************************************
* Zero Copy UDP implementation with ASIO:
* This is the portable zero copy implementation for systems
* where a faster, platform specific solution is not available.
* However, it is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
**********************************************************************/
class udp_zero_copy_asio_impl : public udp_zero_copy{
public:
typedef boost::shared_ptr sptr;
udp_zero_copy_asio_impl(
const std::string &addr,
const std::string &port,
const device_addr_t &hints
):
_recv_frame_size(size_t(hints.cast("recv_frame_size", udp_simple::mtu))),
_num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_FRAMES))),
_send_frame_size(size_t(hints.cast("send_frame_size", udp_simple::mtu))),
_num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_FRAMES))),
_recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
_send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
_pending_recv_buffs(_num_recv_frames),
_pending_send_buffs(_num_send_frames)
{
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
//resolve the address
asio::ip::udp::resolver resolver(_io_service);
asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
//create, open, and connect the socket
_socket = socket_sptr(new asio::ip::udp::socket(_io_service));
_socket->open(asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
_sock_fd = _socket->native();
//allocate re-usable managed receive buffers
for (size_t i = 0; i < get_num_recv_frames(); i++){
_mrb_pool.push_back(udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i),
boost::bind(&udp_zero_copy_asio_impl::release, this, _1))
);
handle_recv(&_mrb_pool.back());
}
//allocate re-usable managed send buffers
for (size_t i = 0; i < get_num_send_frames(); i++){
_msb_pool.push_back(udp_zero_copy_asio_msb(_send_buffer_pool->at(i),
boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2))
);
handle_send(&_msb_pool.back());
}
}
//get size for internal socket buffer
template size_t get_buff_size(void) const{
Opt option;
_socket->get_option(option);
return option.value();
}
//set size for internal socket buffer
template size_t resize_buff(size_t num_bytes){
Opt option(num_bytes);
_socket->set_option(option);
return get_buff_size();
}
/*******************************************************************
* Receive implementation:
*
* Perform a non-blocking receive for performance,
* and then fall back to a blocking receive with timeout.
* Return the managed receive buffer with the new length.
* When the caller is finished with the managed buffer,
* the managed receive buffer is released back into the queue.
******************************************************************/
managed_recv_buffer::sptr get_recv_buff(double timeout){
udp_zero_copy_asio_mrb *mrb = NULL;
if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
#ifdef MSG_DONTWAIT //try a non-blocking recv() if supported
ssize_t ret = ::recv(_sock_fd, mrb->cast(), _recv_frame_size, MSG_DONTWAIT);
if (ret > 0) return mrb->get_new(ret);
#endif
if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new(
::recv(_sock_fd, mrb->cast(), _recv_frame_size, 0)
);
this->handle_recv(mrb); //timeout: return the managed buffer to the queue
}
return managed_recv_buffer::sptr();
}
UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){
_pending_recv_buffs.push_with_haste(mrb);
}
void release(udp_zero_copy_asio_mrb *mrb){
handle_recv(mrb);
}
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
size_t get_recv_frame_size(void) const {return _recv_frame_size;}
/*******************************************************************
* Send implementation:
*
* Get a managed receive buffer immediately with max length set.
* The caller will fill the buffer and commit it when finished.
* The commit routine will perform a blocking send operation,
* and push the managed send buffer back into the queue.
******************************************************************/
managed_send_buffer::sptr get_send_buff(double timeout){
udp_zero_copy_asio_msb *msb = NULL;
if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){
return msb->get_new(_send_frame_size);
}
return managed_send_buffer::sptr();
}
UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){
_pending_send_buffs.push_with_haste(msb);
}
void commit(udp_zero_copy_asio_msb *msb, size_t len){
::send(_sock_fd, msb->cast(), len, 0);
handle_send(msb);
}
size_t get_num_send_frames(void) const {return _num_send_frames;}
size_t get_send_frame_size(void) const {return _send_frame_size;}
private:
//memory management -> buffers and fifos
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
bounded_buffer _pending_recv_buffs;
bounded_buffer _pending_send_buffs;
std::list _msb_pool;
std::list _mrb_pool;
//asio guts -> socket and service
asio::io_service _io_service;
socket_sptr _socket;
int _sock_fd;
};
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
template static void resize_buff_helper(
udp_zero_copy_asio_impl::sptr udp_trans,
const size_t target_size,
const std::string &name
){
std::string help_message;
#if defined(UHD_PLATFORM_LINUX)
help_message = str(boost::format(
"Please run: sudo sysctl -w net.core.%smem_max=%d\n"
) % ((name == "recv")?"r":"w") % target_size);
#endif /*defined(UHD_PLATFORM_LINUX)*/
//resize the buffer if size was provided
if (target_size > 0){
size_t actual_size = udp_trans->resize_buff(target_size);
if (target_size != actual_size) std::cout << boost::format(
"Target %s sock buff size: %d bytes\n"
"Actual %s sock buff size: %d bytes"
) % name % target_size % name % actual_size << std::endl;
else std::cout << boost::format(
"Current %s sock buff size: %d bytes"
) % name % actual_size << std::endl;
if (actual_size < target_size) uhd::warning::post(str(boost::format(
"The %s buffer could not be resized sufficiently.\n"
"See the transport application notes on buffer resizing.\n%s"
) % name % help_message));
}
}
udp_zero_copy::sptr udp_zero_copy::make(
const std::string &addr,
const std::string &port,
const device_addr_t &hints
){
udp_zero_copy_asio_impl::sptr udp_trans(
new udp_zero_copy_asio_impl(addr, port, hints)
);
//extract buffer size hints from the device addr
size_t recv_buff_size = size_t(hints.cast("recv_buff_size", 0.0));
size_t send_buff_size = size_t(hints.cast("send_buff_size", 0.0));
//call the helper to resize send and recv buffers
resize_buff_helper(udp_trans, recv_buff_size, "recv");
resize_buff_helper (udp_trans, send_buff_size, "send");
return udp_trans;
}