1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
//
// Copyright 2010 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
#include <uhd/transport/udp_zero_copy.hpp>
#include <boost/thread.hpp>
#include <boost/format.hpp>
using namespace uhd::transport;
/***********************************************************************
* Smart buffer implementation for udp zerocopy none
*
* This smart buffer implemention houses a const buffer.
* When the smart buffer is deleted, the buffer is freed.
* The memory in the const buffer is allocated with new [],
* and so the destructor frees the buffer with delete [].
**********************************************************************/
class smart_buffer_impl : public smart_buffer{
public:
smart_buffer_impl(const boost::asio::const_buffer &buff){
_buff = buff;
}
~smart_buffer_impl(void){
delete [] boost::asio::buffer_cast<const uint32_t *>(_buff);
}
const boost::asio::const_buffer &get(void) const{
return _buff;
}
private:
boost::asio::const_buffer _buff;
};
/***********************************************************************
* UDP zero copy implementation class
*
* This is the portable zero copy implementation for systems
* where a faster, platform specific solution is not available.
*
* It uses boost asio udp sockets and the standard recv() class,
* and in-fact, is not actually doing a zero-copy implementation.
**********************************************************************/
class udp_zero_copy_impl : public udp_zero_copy{
public:
//structors
udp_zero_copy_impl(const std::string &addr, const std::string &port);
~udp_zero_copy_impl(void);
//send/recv
size_t send(const boost::asio::const_buffer &buff);
smart_buffer::sptr recv(void);
private:
boost::asio::ip::udp::socket *_socket;
boost::asio::io_service _io_service;
size_t get_recv_buff_size(void);
void set_recv_buff_size(size_t);
};
udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
// resolve the address
boost::asio::ip::udp::resolver resolver(_io_service);
boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
// Create, open, and connect the socket
_socket = new boost::asio::ip::udp::socket(_io_service);
_socket->open(boost::asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
// set the rx socket buffer size:
// pick a huge size, and deal with whatever we get
set_recv_buff_size(54321e3); //some big number!
size_t current_buff_size = get_recv_buff_size();
std::cout << boost::format(
"Current rx socket buffer size: %d\n"
) % current_buff_size;
if (current_buff_size < .1e6) std::cout << boost::format(
"Adjust max rx socket buffer size (linux only):\n"
" sysctl -w net.core.rmem_max=VALUE\n"
);
}
udp_zero_copy_impl::~udp_zero_copy_impl(void){
delete _socket;
}
size_t udp_zero_copy_impl::send(const boost::asio::const_buffer &buff){
return _socket->send(boost::asio::buffer(buff));
}
smart_buffer::sptr udp_zero_copy_impl::recv(void){
size_t available = 0;
//implement timeout through polling and sleeping
boost::asio::deadline_timer timer(_socket->get_io_service());
timer.expires_from_now(boost::posix_time::milliseconds(50));
while (not ((available = _socket->available()) or timer.expires_from_now().is_negative())){
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
//allocate memory and create buffer
uint32_t *buff_mem = new uint32_t[available/sizeof(uint32_t)];
boost::asio::mutable_buffer buff(buff_mem, available);
//receive only if data is available
if (available){
_socket->receive(boost::asio::buffer(buff));
}
//create a new smart buffer to house the data
return smart_buffer::sptr(new smart_buffer_impl(buff));
}
size_t udp_zero_copy_impl::get_recv_buff_size(void){
boost::asio::socket_base::receive_buffer_size option;
_socket->get_option(option);
return option.value();
}
void udp_zero_copy_impl::set_recv_buff_size(size_t new_size){
boost::asio::socket_base::receive_buffer_size option(new_size);
_socket->set_option(option);
}
/***********************************************************************
* UDP zero copy make function
**********************************************************************/
udp_zero_copy::sptr udp_zero_copy::make(
const std::string &addr, const std::string &port
){
return sptr(new udp_zero_copy_impl(addr, port));
}
|