1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
#include <uhd/config.hpp>
#include <uhd/utils/log.hpp>
#include <uhd/utils/static.hpp>
#include <uhdlib/transport/adapter.hpp>
#include <uhdlib/transport/dpdk/udp.hpp>
#include <uhdlib/transport/udp_dpdk_link.hpp>
#include <arpa/inet.h>
#include <memory>
using namespace uhd::transport;
using namespace uhd::transport::dpdk;
udp_dpdk_link::udp_dpdk_link(dpdk::port_id_t port_id,
const std::string& remote_addr,
const std::string& remote_port,
const std::string& local_port,
const link_params_t& params)
: _num_recv_frames(params.num_recv_frames)
, _recv_frame_size(params.recv_frame_size)
, _num_send_frames(params.num_send_frames)
, _send_frame_size(params.send_frame_size)
{
// Get a reference to the context, since this class manages DPDK memory
_ctx = dpdk_ctx::get();
UHD_ASSERT_THROW(_ctx);
// Fill in remote IPv4 address and UDP port
// NOTE: Remote MAC address is filled in later by I/O service
int status = inet_pton(AF_INET, remote_addr.c_str(), &_remote_ipv4);
if (status != 1) {
UHD_LOG_ERROR("DPDK", std::string("Invalid destination address ") + remote_addr);
throw uhd::runtime_error(
std::string("DPDK: Invalid destination address ") + remote_addr);
}
_remote_port = rte_cpu_to_be_16(std::stoul(remote_port));
// Grab the port with a route to the remote host
_port = _ctx->get_port(port_id);
uint16_t local_port_num = rte_cpu_to_be_16(std::stoul(local_port));
// Get an unused UDP port for listening
_local_port = _port->alloc_udp_port(local_port_num);
// Validate params
const size_t max_frame_size = _port->get_mtu() - dpdk::HDR_SIZE_UDP_IPV4;
UHD_ASSERT_THROW(params.send_frame_size <= max_frame_size);
UHD_ASSERT_THROW(params.recv_frame_size <= max_frame_size);
// Register the adapter
auto info = _port->get_adapter_info();
auto& adap_ctx = adapter_ctx::get();
_adapter_id = adap_ctx.register_adapter(info);
UHD_LOGGER_TRACE("DPDK") << boost::format("Created udp_dpdk_link to (%s:%s)")
% remote_addr % remote_port;
UHD_LOGGER_TRACE("DPDK")
<< boost::format("num_recv_frames=%d, recv_frame_size=%d, num_send_frames=%d, "
"send_frame_size=%d")
% params.num_recv_frames % params.recv_frame_size % params.num_send_frames
% params.send_frame_size;
}
udp_dpdk_link::~udp_dpdk_link() {}
udp_dpdk_link::sptr udp_dpdk_link::make(const std::string& remote_addr,
const std::string& remote_port,
const link_params_t& params)
{
auto ctx = dpdk::dpdk_ctx::get();
auto port = ctx->get_route(remote_addr);
if (!port) {
UHD_LOG_ERROR("DPDK",
std::string("Could not find route to destination address ") + remote_addr);
throw uhd::runtime_error(
std::string("DPDK: Could not find route to destination address ")
+ remote_addr);
}
return make(port->get_port_id(), remote_addr, remote_port, "0", params);
}
udp_dpdk_link::sptr udp_dpdk_link::make(const dpdk::port_id_t port_id,
const std::string& remote_addr,
const std::string& remote_port,
const std::string& local_port,
const link_params_t& params)
{
UHD_ASSERT_THROW(params.recv_frame_size > 0);
UHD_ASSERT_THROW(params.send_frame_size > 0);
UHD_ASSERT_THROW(params.num_send_frames > 0);
UHD_ASSERT_THROW(params.num_recv_frames > 0);
return std::make_shared<udp_dpdk_link>(
port_id, remote_addr, remote_port, local_port, params);
}
void udp_dpdk_link::enqueue_recv_mbuf(struct rte_mbuf* mbuf)
{
// Get packet size
struct udp_hdr* hdr = rte_pktmbuf_mtod_offset(
mbuf, struct udp_hdr*, sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr));
size_t packet_size = rte_be_to_cpu_16(hdr->dgram_len) - sizeof(struct udp_hdr);
// Prepare the dpdk_frame_buff
auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf);
buff->header_jump(
sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr));
buff->set_packet_size(packet_size);
// Add the dpdk_frame_buff to the list
if (_recv_buff_head) {
buff->prev = _recv_buff_head->prev;
buff->next = _recv_buff_head;
_recv_buff_head->prev->next = buff;
_recv_buff_head->prev = buff;
} else {
_recv_buff_head = buff;
buff->next = buff;
buff->prev = buff;
}
}
frame_buff::uptr udp_dpdk_link::get_recv_buff(int32_t /*timeout_ms*/)
{
auto buff = _recv_buff_head;
if (buff) {
if (_recv_buff_head->next == buff) {
/* Only had the one buff, so the list is empty */
_recv_buff_head = nullptr;
} else {
/* Make the next buff the new list head */
_recv_buff_head->next->prev = _recv_buff_head->prev;
_recv_buff_head->prev->next = _recv_buff_head->next;
_recv_buff_head = _recv_buff_head->next;
}
buff->next = nullptr;
buff->prev = nullptr;
return frame_buff::uptr(buff);
}
return frame_buff::uptr();
}
void udp_dpdk_link::release_recv_buff(frame_buff::uptr buff)
{
dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release();
assert(buff_ptr);
rte_pktmbuf_free(buff_ptr->get_pktmbuf());
}
frame_buff::uptr udp_dpdk_link::get_send_buff(int32_t /*timeout_ms*/)
{
auto mbuf = rte_pktmbuf_alloc(_port->get_tx_pktbuf_pool());
if (mbuf) {
auto buff = new (rte_mbuf_to_priv(mbuf)) dpdk_frame_buff(mbuf);
buff->header_jump(
sizeof(struct ether_hdr) + sizeof(struct ipv4_hdr) + sizeof(struct udp_hdr));
return frame_buff::uptr(buff);
}
return frame_buff::uptr();
}
void udp_dpdk_link::release_send_buff(frame_buff::uptr buff)
{
dpdk_frame_buff* buff_ptr = (dpdk_frame_buff*)buff.release();
assert(buff_ptr);
auto mbuf = buff_ptr->get_pktmbuf();
if (buff_ptr->packet_size()) {
// Fill in L2 header
auto local_mac = _port->get_mac_addr();
struct ether_hdr* l2_hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr*);
ether_addr_copy(&_remote_mac, &l2_hdr->d_addr);
ether_addr_copy(&local_mac, &l2_hdr->s_addr);
l2_hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_IPv4);
// Fill in L3 and L4 headers
dpdk::fill_udp_hdr(mbuf,
_port,
_remote_ipv4,
_local_port,
_remote_port,
buff_ptr->packet_size());
// Prepare the packet buffer and send it out
int status = rte_eth_tx_prepare(_port->get_port_id(), _queue, &mbuf, 1);
if (status != 1) {
throw uhd::runtime_error("DPDK: Failed to prepare TX buffer for send");
}
status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1);
while (status != 1) {
status = rte_eth_tx_burst(_port->get_port_id(), _queue, &mbuf, 1);
// FIXME: Should we make available retrying?
// throw uhd::runtime_error("DPDK: Failed to send TX buffer");
}
} else {
// Release the buffer if there is nothing in it
rte_pktmbuf_free(mbuf);
}
}
|