1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
#include <uhd/transport/frame_buff.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/transport/dpdk/udp.hpp>
#include <uhdlib/transport/dpdk_io_service.hpp>
#include <uhdlib/transport/dpdk_io_service_client.hpp>
#include <uhdlib/transport/dpdk_simple.hpp>
#include <uhdlib/transport/links.hpp>
#include <uhdlib/transport/udp_dpdk_link.hpp>
#include <arpa/inet.h>
namespace uhd { namespace transport {
namespace {
constexpr double SEND_TIMEOUT_MS = 500; // seconds
}
class dpdk_simple_impl : public dpdk_simple
{
public:
dpdk_simple_impl(const std::string& addr, const std::string& port)
{
link_params_t link_params = _get_default_link_params();
_link = uhd::transport::udp_dpdk_link::make(addr, port, link_params);
UHD_LOG_TRACE("DPDK::SIMPLE",
"Creating simple UDP object for " << addr << ":" << port
<< ", DPDK port index "
<< _link->get_port()->get_port_id());
// The context must be initialized, or we'd never get here
auto ctx = uhd::transport::dpdk::dpdk_ctx::get();
UHD_ASSERT_THROW(ctx->is_init_done());
// Init I/O service
_port_id = _link->get_port()->get_port_id();
_io_service = ctx->get_io_service(_port_id);
// This is normally done by the I/O service manager, but with DPDK, this
// is all it does so we skip that step
UHD_LOG_TRACE("DPDK::SIMPLE", "Attaching link to I/O service...");
_io_service->attach_recv_link(_link);
_io_service->attach_send_link(_link);
auto recv_cb = [this](buff_t::uptr& buff, recv_link_if*, send_link_if*) {
return this->_recv_callback(buff);
};
auto fc_cb = [this](buff_t::uptr buff, recv_link_if*, send_link_if*) {
this->_recv_fc_callback(std::move(buff));
};
_recv_io = _io_service->make_recv_client(_link,
link_params.num_recv_frames,
recv_cb,
nullptr, // No send/fc link
0, // No send frames
fc_cb);
auto send_cb = [this](buff_t::uptr buff, transport::send_link_if*) {
this->_send_callback(std::move(buff));
};
_send_io = _io_service->make_send_client(_link,
link_params.num_send_frames,
send_cb,
nullptr, // no FC link
0,
nullptr, // No receive callback necessary
[](const size_t) { return true; } // We can always send
);
UHD_LOG_TRACE("DPDK::SIMPLE", "Constructor complete");
}
~dpdk_simple_impl(void)
{
UHD_LOG_TRACE("DPDK::SIMPLE",
"~dpdk_simple_impl(), DPDK port index " << _link->get_port()->get_port_id());
// Disconnect the clients from the I/O service
_send_io.reset();
_recv_io.reset();
// Disconnect the link from the I/O service
_io_service->detach_recv_link(_link);
_io_service->detach_send_link(_link);
}
/*! Send and release outstanding buffer
*
* \param length bytes of data to send
* \return number of bytes sent (releases buffer if sent)
*/
size_t send(const boost::asio::const_buffer& user_buff)
{
// Extract buff and sanity check
const size_t nbytes = boost::asio::buffer_size(user_buff);
UHD_ASSERT_THROW(nbytes <= _link->get_send_frame_size())
const uint8_t* user_data = boost::asio::buffer_cast<const uint8_t*>(user_buff);
// Get send buff
auto buff = _send_io->get_send_buff(SEND_TIMEOUT_MS);
UHD_ASSERT_THROW(buff);
buff->set_packet_size(nbytes);
std::memcpy(buff->data(), user_data, nbytes);
// Release send buff (send the packet)
_send_io->release_send_buff(std::move(buff));
return nbytes;
}
/*! Receive a single packet.
*
* Buffer provided by transport (must be freed before next operation).
*
* \param buf a pointer to place to write buffer location
* \param timeout the timeout in seconds
* \return the number of bytes received or zero on timeout
*/
size_t recv(const boost::asio::mutable_buffer& user_buff, double timeout)
{
size_t user_buff_size = boost::asio::buffer_size(user_buff);
uint8_t* user_data = boost::asio::buffer_cast<uint8_t*>(user_buff);
auto buff = _recv_io->get_recv_buff(static_cast<int32_t>(timeout * 1e3));
if (!buff) {
return 0;
}
// Extract the sender's address. This is only possible because we know
// the memory layout of the buff
struct udp_hdr* udp_hdr_end = (struct udp_hdr*)buff->data();
struct ipv4_hdr* ip_hdr_end = (struct ipv4_hdr*)(&udp_hdr_end[-1]);
struct ipv4_hdr* ip_hdr = (struct ipv4_hdr*)(&ip_hdr_end[-1]);
_last_recv_addr = ip_hdr->src_addr;
// Extract the buffer data
const size_t copy_len = std::min(user_buff_size, buff->packet_size());
if (copy_len < buff->packet_size()) {
UHD_LOG_WARNING("DPDK", "Truncating recv packet");
}
std::memcpy(user_data, buff->data(), copy_len);
// Housekeeping
_recv_io->release_recv_buff(std::move(buff));
return copy_len;
}
std::string get_recv_addr(void)
{
return dpdk::ipv4_num_to_str(_last_recv_addr);
}
std::string get_send_addr(void)
{
return dpdk::ipv4_num_to_str(_link->get_remote_ipv4());
}
private:
using buff_t = frame_buff;
link_params_t _get_default_link_params()
{
link_params_t link_params;
link_params.recv_frame_size = 8000;
link_params.send_frame_size = 8000;
link_params.num_recv_frames = 1;
link_params.num_send_frames = 1;
link_params.recv_buff_size = 8000;
link_params.send_buff_size = 8000;
return link_params;
}
void _send_callback(buff_t::uptr buff)
{
_link->release_send_buff(std::move(buff));
}
bool _recv_callback(buff_t::uptr&)
{
// Queue it up
return true;
}
void _recv_fc_callback(buff_t::uptr buff)
{
_link->release_recv_buff(std::move(buff));
}
/*** Attributes **********************************************************/
unsigned int _port_id;
uint32_t _last_recv_addr;
udp_dpdk_link::sptr _link;
dpdk_io_service::sptr _io_service;
send_io_if::sptr _send_io;
recv_io_if::sptr _recv_io;
};
dpdk_simple::~dpdk_simple(void) {}
/***********************************************************************
* DPDK simple transport public make functions
**********************************************************************/
udp_simple::sptr dpdk_simple::make_connected(
const std::string& addr, const std::string& port)
{
return udp_simple::sptr(new dpdk_simple_impl(addr, port));
}
// For DPDK, this is not special and the same as make_connected
udp_simple::sptr dpdk_simple::make_broadcast(
const std::string& addr, const std::string& port)
{
return udp_simple::sptr(new dpdk_simple_impl(addr, port));
}
}} // namespace uhd::transport
|