1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
#include <uhd/exception.hpp>
#include <uhd/utils/log.hpp>
#include <uhd/utils/safe_call.hpp>
#include <uhd/utils/thread.hpp>
#include <uhdlib/rfnoc/chdr_ctrl_endpoint.hpp>
#include <uhdlib/rfnoc/chdr_packet.hpp>
#include <uhdlib/rfnoc/chdr_types.hpp>
#include <boost/format.hpp>
#include <atomic>
#include <mutex>
#include <thread>
using namespace uhd;
using namespace uhd::rfnoc;
using namespace uhd::rfnoc::chdr;
using namespace std::placeholders;
chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default;
class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint
{
public:
chdr_ctrl_endpoint_impl(const chdr_ctrl_xport_t& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid)
: _my_epid(my_epid)
, _xport(xport)
, _send_seqnum(0)
, _send_pkt(pkt_factory.make_ctrl())
, _recv_pkt(pkt_factory.make_ctrl())
, _stop_recv_thread(false)
, _recv_thread([this]() { recv_worker(); })
{
const std::string thread_name(str(boost::format("uhd_ctrl_ep%04x") % _my_epid));
uhd::set_thread_name(&_recv_thread, thread_name);
UHD_LOG_DEBUG("RFNOC",
boost::format(
"Started thread %s to process messages control messages on EPID %d")
% thread_name % _my_epid);
}
virtual ~chdr_ctrl_endpoint_impl()
{
UHD_SAFE_CALL(
// Interrupt buffer updater loop
_stop_recv_thread = true;
// Wait for loop to finish
// No timeout on join. The recv loop is guaranteed
// to terminate in a reasonable amount of time because
// there are no timed blocks on the underlying.
_recv_thread.join();
// Flush base transport
while (_xport.recv->get_recv_buff(0.0001)) /*NOP*/;
// Release child endpoints
_endpoint_map.clear(););
}
virtual ctrlport_endpoint::sptr get_ctrlport_ep(sep_id_t dst_epid,
uint16_t dst_port,
size_t buff_capacity,
size_t max_outstanding_async_msgs,
const clock_iface& client_clk,
const clock_iface& timebase_clk)
{
std::lock_guard<std::mutex> lock(_mutex);
ep_map_key_t key{dst_epid, dst_port};
// Function to send a control payload
auto send_fn = [this, dst_epid](const ctrl_payload& payload, double timeout) {
// Build header
chdr_header header;
header.set_pkt_type(PKT_TYPE_CTRL);
header.set_num_mdata(0);
header.set_seq_num(_send_seqnum++);
header.set_dst_epid(dst_epid);
// Acquire send buffer and send the packet
std::lock_guard<std::mutex> lock(_send_mutex);
auto send_buff = _xport.send->get_send_buff(timeout);
_send_pkt->refresh(send_buff->cast<void*>(), header, payload);
send_buff->commit(header.get_length());
};
if (_endpoint_map.find(key) == _endpoint_map.end()) {
ctrlport_endpoint::sptr ctrlport_ep = ctrlport_endpoint::make(send_fn,
_my_epid,
dst_port,
buff_capacity,
max_outstanding_async_msgs,
client_clk,
timebase_clk);
_endpoint_map.insert(std::make_pair(key, ctrlport_ep));
UHD_LOG_DEBUG("RFNOC",
boost::format("Created ctrlport endpoint for port %d on EPID %d")
% dst_port % _my_epid);
return ctrlport_ep;
} else {
return _endpoint_map.at(key);
}
}
virtual size_t get_num_drops() const
{
return _num_drops;
}
private:
void recv_worker()
{
// Run forever:
// - Pull packets from the base transport
// - Route them based on the dst_port
// - Pass them to the ctrlport_endpoint for additional processing
while (not _stop_recv_thread) {
auto buff = _xport.recv->get_recv_buff(0.0);
if (buff) {
std::lock_guard<std::mutex> lock(_mutex);
try {
_recv_pkt->refresh(buff->cast<void*>());
const ctrl_payload payload = _recv_pkt->get_payload();
ep_map_key_t key{payload.src_epid, payload.dst_port};
if (_endpoint_map.find(key) != _endpoint_map.end()) {
_endpoint_map.at(key)->handle_recv(payload);
}
} catch (...) {
// Ignore all errors
}
} else {
// Be a good citizen and yield if no packet is processed
static const size_t MIN_DUR = 1;
boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
// We call sleep(MIN_DUR) above instead of yield() to ensure that we
// relinquish the current scheduler time slot.
// yield() is a hint to the scheduler to end the time
// slice early and schedule in another thread that is ready to run.
// However in most situations, there will be no other thread and
// this thread will continue to run which will rail a CPU core.
// We call sleep(MIN_DUR=1) instead which will sleep for a minimum
// time. Ideally we would like to use boost::chrono::.*seconds::min()
// but that is bound to 0, which causes the sleep_for call to be a
// no-op and thus useless to actually force a sleep.
}
}
}
using ep_map_key_t = std::pair<sep_id_t, uint16_t>;
// The endpoint ID of this software endpoint
const sep_id_t _my_epid;
// Send/recv transports
const chdr_ctrl_xport_t _xport;
// The curent sequence number for a send packet
size_t _send_seqnum = 0;
// The number of packets dropped
size_t _num_drops = 0;
// Packet containers
chdr_ctrl_packet::uptr _send_pkt;
chdr_ctrl_packet::cuptr _recv_pkt;
// A collection of ctrlport endpoints (keyed by the port number)
std::map<ep_map_key_t, ctrlport_endpoint::sptr> _endpoint_map;
// A thread that will handle all responses and async message requests
std::atomic_bool _stop_recv_thread;
std::thread _recv_thread;
// Mutex that protects all state in this class except for _send_pkt
std::mutex _mutex;
// Mutex that protects _send_pkt and _xport.send
std::mutex _send_mutex;
};
chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const chdr_ctrl_xport_t& xport,
const chdr::chdr_packet_factory& pkt_factory,
sep_id_t my_epid)
{
return std::make_unique<chdr_ctrl_endpoint_impl>(xport, pkt_factory, my_epid);
}
|