//
// Copyright 2010-2011 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//
#include "udp_common.hpp"
#include
#include //mtu
#include
#include
#include
#include
#include
#include
using namespace uhd;
using namespace uhd::transport;
namespace asio = boost::asio;
//A reasonable number of frames for send/recv and async/sync
static const size_t DEFAULT_NUM_FRAMES = 32;
/***********************************************************************
* Check registry for correct fast-path setting
**********************************************************************/
#include //CRegKey
static void check_registry_for_fast_send_threshold(const size_t mtu){
static bool warned = false;
if (warned) return; //only allow one printed warning per process
CRegKey reg_key;
DWORD threshold = 1024; //system default when threshold is not specified
if (
reg_key.Open(HKEY_LOCAL_MACHINE, "System\\CurrentControlSet\\Services\\AFD\\Parameters", KEY_READ) != ERROR_SUCCESS or
reg_key.QueryDWORDValue("FastSendDatagramThreshold", threshold) != ERROR_SUCCESS or threshold < mtu
){
UHD_MSG(warning) << boost::format(
"The MTU (%d) is larger than the FastSendDatagramThreshold (%d)!\n"
"This will negatively affect the transmit performance.\n"
"See the transport application notes for more detail.\n"
) % mtu % threshold << std::endl;
warned = true;
}
reg_key.Close();
}
/***********************************************************************
* Static initialization to take care of WSA init and cleanup
**********************************************************************/
struct uhd_wsa_control{
uhd_wsa_control(void){
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData); /*windows socket startup */
}
~uhd_wsa_control(void){
WSACleanup();
}
};
/***********************************************************************
* Reusable managed receiver buffer:
* - Initialize with memory and a release callback.
* - Call get new with a length in bytes to re-use.
**********************************************************************/
class udp_zero_copy_asio_mrb : public managed_recv_buffer{
public:
udp_zero_copy_asio_mrb(void *mem, bounded_buffer &pending):
_mem(mem), _len(0), _pending(pending){/* NOP */}
void release(void){
if (_len == 0) return;
_pending.push_with_haste(this);
_len = 0;
}
sptr get_new(size_t len){
_len = len;
return make_managed_buffer(this);
}
template T cast(void) const{return static_cast(_mem);}
private:
const void *get_buff(void) const{return _mem;}
size_t get_size(void) const{return _len;}
void *_mem;
size_t _len;
bounded_buffer &_pending;
};
/***********************************************************************
* Reusable managed send buffer:
* - committing the buffer calls the asynchronous socket send
* - getting a new buffer performs the blocking wait for completion
**********************************************************************/
class udp_zero_copy_asio_msb : public managed_send_buffer{
public:
udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size):
_sock_fd(sock_fd), _frame_size(frame_size), _committed(false)
{
_wsa_buff.buf = reinterpret_cast(mem);
ZeroMemory(&_overlapped, sizeof(_overlapped));
_overlapped.hEvent = WSACreateEvent();
UHD_ASSERT_THROW(_overlapped.hEvent != WSA_INVALID_EVENT);
this->commit(0); //makes buffer available via get_new
}
~udp_zero_copy_asio_msb(void){
WSACloseEvent(_overlapped.hEvent);
}
UHD_INLINE void commit(size_t len){
if (_committed) return;
_committed = true;
_wsa_buff.len = len;
if (len == 0) WSASetEvent(_overlapped.hEvent);
else WSASend(_sock_fd, &_wsa_buff, 1, NULL, 0, &_overlapped, NULL);
}
UHD_INLINE sptr get_new(const double timeout, size_t &index){
const DWORD result = WSAWaitForMultipleEvents(
1, &_overlapped.hEvent, true, DWORD(timeout*1000), true
);
if (result == WSA_WAIT_TIMEOUT) return managed_send_buffer::sptr();
index++; //advances the caller's buffer
WSAResetEvent(_overlapped.hEvent);
_committed = false;
_wsa_buff.len = _frame_size;
return make_managed_buffer(this);
}
private:
void *get_buff(void) const{return _wsa_buff.buf;}
size_t get_size(void) const{return _wsa_buff.len;}
int _sock_fd;
const size_t _frame_size;
bool _committed;
WSAOVERLAPPED _overlapped;
WSABUF _wsa_buff;
};
/***********************************************************************
* Zero Copy UDP implementation with WSA:
*
* This is not a true zero copy implementation as each
* send and recv requires a copy operation to/from userspace.
*
* For receive, use a blocking recv() call on the socket.
* This has better performance than the overlapped IO.
* For send, use overlapped IO to submit async sends.
**********************************************************************/
class udp_zero_copy_wsa_impl : public udp_zero_copy{
public:
typedef boost::shared_ptr sptr;
udp_zero_copy_wsa_impl(
const std::string &addr,
const std::string &port,
const device_addr_t &hints
):
_recv_frame_size(size_t(hints.cast("recv_frame_size", udp_simple::mtu))),
_num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_FRAMES))),
_send_frame_size(size_t(hints.cast("send_frame_size", udp_simple::mtu))),
_num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_FRAMES))),
_recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
_send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
_pending_recv_buffs(_num_recv_frames),
_next_send_buff_index(0)
{
check_registry_for_fast_send_threshold(this->get_send_frame_size());
UHD_MSG(status) << boost::format("Creating WSA UDP transport for %s:%s") % addr % port << std::endl;
static uhd_wsa_control uhd_wsa; //makes wsa start happen via lazy initialization
UHD_ASSERT_THROW(_num_send_frames <= WSA_MAXIMUM_WAIT_EVENTS);
//resolve the address
asio::io_service io_service;
asio::ip::udp::resolver resolver(io_service);
asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
//create the socket
_sock_fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (_sock_fd == INVALID_SOCKET){
const DWORD error = WSAGetLastError();
throw uhd::os_error(str(boost::format("WSASocket() failed with error %d") % error));
}
//set the socket non-blocking for recv
u_long mode = 1;
ioctlsocket(_sock_fd, FIONBIO, &mode);
//resize the socket buffers
const int recv_buff_size = int(hints.cast("recv_buff_size", 0.0));
const int send_buff_size = int(hints.cast("send_buff_size", 0.0));
if (recv_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_RCVBUF, (const char *)&recv_buff_size, sizeof(recv_buff_size));
if (send_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_SNDBUF, (const char *)&send_buff_size, sizeof(send_buff_size));
//connect the socket so we can send/recv
const asio::ip::udp::endpoint::data_type &servaddr = *receiver_endpoint.data();
if (WSAConnect(_sock_fd, (const struct sockaddr *)&servaddr, sizeof(servaddr), NULL, NULL, NULL, NULL) != 0){
const DWORD error = WSAGetLastError();
closesocket(_sock_fd);
throw uhd::os_error(str(boost::format("WSAConnect() failed with error %d") % error));
}
//allocate re-usable managed receive buffers
for (size_t i = 0; i < get_num_recv_frames(); i++){
_mrb_pool.push_back(boost::shared_ptr(
new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), _pending_recv_buffs)
));
_pending_recv_buffs.push_with_haste(_mrb_pool.back().get());
}
//allocate re-usable managed send buffers
for (size_t i = 0; i < get_num_send_frames(); i++){
_msb_pool.push_back(boost::shared_ptr(
new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), _sock_fd, get_send_frame_size())
));
}
}
~udp_zero_copy_wsa_impl(void){
closesocket(_sock_fd);
}
/*******************************************************************
* Receive implementation:
*
* Perform a non-blocking receive for performance,
* and then fall back to a blocking receive with timeout.
* Return the managed receive buffer with the new length.
* When the caller is finished with the managed buffer,
* the managed receive buffer is released back into the queue.
******************************************************************/
managed_recv_buffer::sptr get_recv_buff(double timeout){
udp_zero_copy_asio_mrb *mrb = NULL;
if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){
ssize_t ret = ::recv(_sock_fd, mrb->cast(), _recv_frame_size, 0);
if (ret > 0) return mrb->get_new(ret);
if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new(
::recv(_sock_fd, mrb->cast(), _recv_frame_size, 0)
);
_pending_recv_buffs.push_with_haste(mrb); //timeout: return the managed buffer to the queue
}
return managed_recv_buffer::sptr();
}
size_t get_num_recv_frames(void) const {return _num_recv_frames;}
size_t get_recv_frame_size(void) const {return _recv_frame_size;}
/*******************************************************************
* Send implementation:
* Block on the managed buffer's get call and advance the index.
******************************************************************/
managed_send_buffer::sptr get_send_buff(double timeout){
if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
}
size_t get_num_send_frames(void) const {return _num_send_frames;}
size_t get_send_frame_size(void) const {return _send_frame_size;}
private:
//memory management -> buffers and fifos
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
std::vector > _msb_pool;
std::vector > _mrb_pool;
bounded_buffer _pending_recv_buffs;
size_t _next_send_buff_index;
//socket guts
SOCKET _sock_fd;
};
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
udp_zero_copy::sptr udp_zero_copy::make(
const std::string &addr,
const std::string &port,
const device_addr_t &hints
){
return sptr(new udp_zero_copy_wsa_impl(addr, port, hints));
}