1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
|
//
// Copyright 2019 Ettus Research, a National Instruments Brand
//
// SPDX-License-Identifier: GPL-3.0-or-later
//
#include <uhd/exception.hpp>
#include <uhd/rfnoc/chdr_types.hpp>
#include <uhd/utils/log.hpp>
#include <uhdlib/rfnoc/chdr_packet_writer.hpp>
#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
#include <condition_variable>
#include <boost/optional.hpp>
#include <deque>
#include <mutex>
#include <numeric>
#include <set>
using namespace uhd;
using namespace uhd::rfnoc;
using namespace uhd::rfnoc::chdr;
using namespace std::chrono;
using namespace std::chrono_literals;
namespace {
//! Max async msg (CTRL_WRITE) size in 32-bit words (2 hdr, 2 TS, 1 op-word, 1 data)
constexpr size_t ASYNC_MESSAGE_SIZE = 6;
//! Default completion timeout for transactions
constexpr double DEFAULT_TIMEOUT = 1.0;
//! Long timeout for when we wait on a timed command
constexpr double MASSIVE_TIMEOUT = 10.0;
//! Default value for whether ACKs are always required
constexpr bool DEFAULT_FORCE_ACKS = false;
} // namespace
ctrlport_endpoint::~ctrlport_endpoint() = default;
class ctrlport_endpoint_impl : public ctrlport_endpoint
{
public:
ctrlport_endpoint_impl(const send_fn_t& send_fcn,
sep_id_t my_epid,
uint16_t local_port,
size_t buff_capacity,
size_t max_outstanding_async_msgs,
const clock_iface& client_clk,
const clock_iface& timebase_clk)
: _handle_send(send_fcn)
, _my_epid(my_epid)
, _local_port(local_port)
, _buff_capacity(buff_capacity)
, _max_outstanding_async_msgs(max_outstanding_async_msgs)
, _client_clk(client_clk)
, _timebase_clk(timebase_clk)
{
}
~ctrlport_endpoint_impl() override = default;
void poke32(uint32_t addr,
uint32_t data,
uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
bool ack = false) override
{
// Send request and optionally wait for an ACK
send_request_packet(OP_WRITE, addr, {data}, timestamp, ack);
}
void multi_poke32(const std::vector<uint32_t> addrs,
const std::vector<uint32_t> data,
uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
bool ack = false) override
{
if (addrs.size() != data.size()) {
throw uhd::value_error("addrs and data vectors must be of the same length");
}
for (size_t i = 0; i < data.size(); i++) {
poke32(addrs[i],
data[i],
(i == 0) ? timestamp : uhd::time_spec_t::ASAP,
(i == data.size() - 1) ? ack : false);
}
}
void block_poke32(uint32_t first_addr,
const std::vector<uint32_t> data,
uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
bool ack = false) override
{
for (size_t i = 0; i < data.size(); i++) {
poke32(first_addr + (i * sizeof(uint32_t)),
data[i],
(i == 0) ? timestamp : uhd::time_spec_t::ASAP,
(i == data.size() - 1) ? ack : false);
}
/* TODO: Uncomment when the atomic block poke is implemented in the FPGA
// Send request and optionally want for an ACK
send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp, ack);
*/
}
uint32_t peek32(
uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) override
{
// Send request and wait for an ACK
boost::optional<ctrl_payload> response;
std::tie(std::ignore, response) =
send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp);
return response.get().data_vtr[0];
}
std::vector<uint32_t> block_peek32(uint32_t first_addr,
size_t length,
uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) override
{
std::vector<uint32_t> values;
for (size_t i = 0; i < length; i++) {
values.push_back(peek32(first_addr + (i * sizeof(uint32_t)),
(i == 0) ? timestamp : uhd::time_spec_t::ASAP));
}
return values;
/* TODO: Uncomment when the atomic block peek is implemented in the FPGA
// Send request and wait for an ACK
boost::optional<ctrl_payload> response;
std::tie(std::ignore, response) = send_request_packet(OP_READ,
first_addr,
std::vector<uint32_t>(length, 0),
timestamp);
return response.get().data_vtr;
*/
}
void poll32(uint32_t addr,
uint32_t data,
uint32_t mask,
uhd::time_spec_t timeout,
uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
bool ack = false) override
{
// TODO: Uncomment when this is implemented in the FPGA
throw uhd::not_implemented_error("Control poll not implemented in the FPGA");
// Send request and optionally wait for an ACK
send_request_packet(OP_POLL,
addr,
{data,
mask,
static_cast<uint32_t>(timeout.to_ticks(_timebase_clk.get_freq()))},
timestamp,
ack);
}
void sleep(uhd::time_spec_t duration, bool ack = false) override
{
// Send request and optionally wait for an ACK
send_request_packet(OP_SLEEP,
0,
{static_cast<uint32_t>(duration.to_ticks(_timebase_clk.get_freq()))},
uhd::time_spec_t::ASAP,
ack);
}
void register_async_msg_validator(async_msg_validator_t callback_f) override
{
std::unique_lock<std::mutex> lock(_mutex);
_validate_async_msg = callback_f;
}
void register_async_msg_handler(async_msg_callback_t callback_f) override
{
std::unique_lock<std::mutex> lock(_mutex);
_handle_async_msg = callback_f;
}
void set_policy(const std::string& name, const uhd::device_addr_t& args) override
{
std::unique_lock<std::mutex> lock(_mutex);
if (name == "default") {
_policy.timeout = args.cast<double>("timeout", DEFAULT_TIMEOUT);
_policy.force_acks = DEFAULT_FORCE_ACKS;
} else {
// TODO: Uncomment when custom policies are implemented
throw uhd::not_implemented_error("Policy implemented in the FPGA");
}
}
void handle_recv(const ctrl_payload& rx_ctrl) override
{
if (rx_ctrl.is_ack) {
// Function to process a response with no sequence errors
auto process_correct_response = [this, rx_ctrl]() {
response_status_t resp_status = RESP_VALID;
// Grant flow control credits
_buff_occupied -= get_payload_size(_req_queue.front());
_buff_free_cond.notify_one();
if (get_payload_size(_req_queue.front()) != get_payload_size(rx_ctrl)) {
resp_status = RESP_SIZEERR;
}
// Pop the request from the queue
_req_queue.pop_front();
// Push the response into the response queue if and only if
// the client wanted an ACK for this packet
wanted_ack_key request_key{
rx_ctrl.seq_num, rx_ctrl.op_code, rx_ctrl.address};
if (_wanted_acks.count(request_key)) {
_wanted_acks.erase(request_key);
_resp_queue.push_back(std::make_tuple(rx_ctrl, resp_status));
_resp_ready_cond.notify_all();
} else {
// If the client didn't want an ACK for this request, but
// the response indicated an error, then provide some
// feedback
if (rx_ctrl.status != CMD_OKAY) {
log_response_packet_error(rx_ctrl);
}
}
};
// Function to process a response with sequence errors
auto process_incorrect_response = [this]() {
// Grant flow control credits
_buff_occupied -= get_payload_size(_req_queue.front());
_buff_free_cond.notify_one();
// Push a fabricated response into the response queue only
// if the client was waiting on an ACK for the original
// request whose response was lost
ctrl_payload resp(_req_queue.front());
wanted_ack_key request_key{resp.seq_num, resp.op_code, resp.address};
if (_wanted_acks.count(request_key)) {
_wanted_acks.erase(request_key);
resp.is_ack = true;
_resp_queue.push_back(std::make_tuple(resp, RESP_DROPPED));
_resp_ready_cond.notify_all();
} else {
// If the client did not want an ACK for this request, but
// we fabricated a response due to a sequence error, then
// provide feedback
log_dropped_packet(resp);
}
// Pop the request from the queue
_req_queue.pop_front();
};
// Peek at the request queue to check the expected sequence number
std::unique_lock<std::mutex> lock(_mutex);
int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num);
if (seq_num_diff == 0) { // No sequence error
process_correct_response();
} else if (seq_num_diff > 0) { // Packet(s) dropped
// Tag all dropped packets
for (int8_t i = 0; i < seq_num_diff; i++) {
process_incorrect_response();
}
// Process correct response
process_correct_response();
} else { // Reordered packet(s)
// Requests are processed in order. If seq_num_diff is negative then we
// have either already seen this response or we have dropped >128
// responses. Either way ignore this packet.
}
} else {
// Handle asynchronous message callback
ctrl_status_t status = CMD_CMDERR;
if (rx_ctrl.op_code != OP_WRITE && rx_ctrl.op_code != OP_BLOCK_WRITE) {
UHD_LOG_ERROR(
"CTRLEP", "Malformed async message request: Invalid opcode");
} else if (rx_ctrl.dst_port != _local_port) {
UHD_LOG_ERROR("CTRLEP",
"Malformed async message request: Invalid port "
<< rx_ctrl.dst_port << ", expected my local port "
<< _local_port);
} else if (rx_ctrl.data_vtr.empty()) {
UHD_LOG_ERROR(
"CTRLEP", "Malformed async message request: Invalid num_data");
} else {
if (!_validate_async_msg(rx_ctrl.address, rx_ctrl.data_vtr)) {
UHD_LOG_ERROR("CTRLEP",
"Malformed async message request: Async message was not "
"validated by block controller!");
} else {
status = CMD_OKAY;
}
}
try {
// Respond with an ACK packet
// Flow control not needed because we have allocated special room in the
// buffer for async message responses
ctrl_payload tx_ctrl(rx_ctrl);
tx_ctrl.is_ack = true;
tx_ctrl.src_epid = _my_epid;
tx_ctrl.status = status;
const auto timeout = [&]() {
std::unique_lock<std::mutex> lock(_mutex);
return _policy.timeout;
}();
_handle_send(tx_ctrl, timeout);
} catch (...) {
UHD_LOG_ERROR("CTRLEP",
"Encountered an error sending a response for an async message");
return;
}
if (status == CMD_OKAY) {
try {
_handle_async_msg(
rx_ctrl.address, rx_ctrl.data_vtr, rx_ctrl.timestamp);
} catch (const std::exception& ex) {
UHD_LOG_ERROR("CTRLEP",
"Caught exception during async message handling: " << ex.what());
} catch (...) {
UHD_LOG_ERROR("CTRLEP",
"Caught unknown exception during async message handling!");
}
}
}
}
uint16_t get_src_epid() const override
{
// Is const, does not require a mutex
return _my_epid;
}
uint16_t get_port_num() const override
{
// Is const, does not require a mutex
return _local_port;
}
private:
//! The software status (different from the transaction status) of the response
enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
//! Returns the length of the control payload in 32-bit words
inline static size_t get_payload_size(const ctrl_payload& payload)
{
return 2 + (payload.timestamp.is_initialized() ? 2 : 0) + payload.data_vtr.size();
}
//! Marks the start of a timeout for an operation and returns the expiration time
inline const steady_clock::time_point start_timeout(double duration)
{
return steady_clock::now() + (static_cast<int>(std::ceil(duration / 1e-6)) * 1us);
}
//! Returns whether or not we have a timed command queued
bool check_timed_in_queue() const
{
for (const auto& pyld : _req_queue) {
if (pyld.has_timestamp()) {
return true;
}
}
return false;
}
//! Sends a request control packet to a remote device, optionally waiting
// for an ACK, and returns any response if applicable
const std::pair<ctrl_payload, boost::optional<ctrl_payload>> send_request_packet(
ctrl_opcode_t op_code,
uint32_t address,
const std::vector<uint32_t>& data_vtr,
const uhd::time_spec_t& time_spec,
const bool require_ack = true)
{
if (!_client_clk.is_running()) {
throw uhd::system_error("Ctrlport client clock is not running");
}
// Convert from uhd::time_spec to timestamp
boost::optional<uint64_t> timestamp;
if (time_spec != time_spec_t::ASAP) {
if (!_timebase_clk.is_running()) {
throw uhd::system_error("Timebase clock is not running");
}
timestamp = time_spec.to_ticks(_timebase_clk.get_freq());
}
std::unique_lock<std::mutex> lock(_mutex);
// Assemble the control payload
ctrl_payload tx_ctrl;
tx_ctrl.dst_port = _local_port;
tx_ctrl.src_port = _local_port;
tx_ctrl.seq_num = _tx_seq_num;
tx_ctrl.timestamp = timestamp;
tx_ctrl.is_ack = false;
tx_ctrl.src_epid = _my_epid;
tx_ctrl.address = address;
tx_ctrl.data_vtr = data_vtr;
tx_ctrl.byte_enable = 0xF;
tx_ctrl.op_code = op_code;
tx_ctrl.status = CMD_OKAY;
// Perform flow control
// If there is no room in the downstream buffer, then wait until the timeout
size_t pyld_size = get_payload_size(tx_ctrl);
auto buff_not_full = [this, pyld_size]() -> bool {
// Allocate room in the queue for one async response packet
// If we can fit the current request in the queue then we can proceed
return (_buff_occupied + pyld_size)
<= (_buff_capacity
- (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
};
if (!buff_not_full()) {
// If there is a timed command in the queue, use the
// MASSIVE_TIMEOUT instead
auto timeout_time =
start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
if (not _buff_free_cond.wait_until(lock, timeout_time, buff_not_full)) {
throw uhd::op_timeout(
"Control operation timed out waiting for space in command buffer");
}
}
_buff_occupied += pyld_size;
_req_queue.push_back(tx_ctrl);
if (require_ack || _policy.force_acks) {
// If the client wants an ACK for this request, make note of its
// details in a set. This set will be consulted when responses are
// received.
wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address};
_wanted_acks.insert(ack_key);
}
try {
// Send the payload as soon as there is room in the buffer
_handle_send(tx_ctrl, _policy.timeout);
_tx_seq_num = (_tx_seq_num + 1) % 64;
if (require_ack || _policy.force_acks) {
auto response = wait_for_ack(tx_ctrl, lock);
return {tx_ctrl, response};
} else {
return {tx_ctrl, {}};
}
} catch (...) {
// Something went wrong while trying to send the request.
// Remove the entry from the ACK tracking set.
wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address};
_wanted_acks.erase(ack_key);
throw;
}
}
//! Waits for and returns the ACK for the specified request
const ctrl_payload wait_for_ack(
const ctrl_payload& request, std::unique_lock<std::mutex>& lock)
{
// Determine the timepoint (now plus some offset, depending on
// whether we're waiting for a timed command or if we have a
// command in the queue) at which point we flag a timeout.
const auto timeout_time =
start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
// Check the queue for the response for the specific request, looping
// until it's found or the timeout timepoint has been reached.
do {
// Extract the response packet
ctrl_payload rx_ctrl;
response_status_t resp_status;
// Iterate the response queue, looking for the response that is
// meant for the request. When found, it is removed from the queue.
// Other elements are left undisturbed.
for (auto q_iterator = _resp_queue.begin(); q_iterator != _resp_queue.end();
q_iterator++) {
std::tie(rx_ctrl, resp_status) = *q_iterator;
// Check if this is the response meant for the request
// Filter by op_code, address and seq_num
if (rx_ctrl.seq_num == request.seq_num
&& rx_ctrl.op_code == request.op_code
&& rx_ctrl.address == request.address) {
// Remove the response from the queue
_resp_queue.erase(q_iterator);
// Validate transaction status, either returning the
// response if everything checks out, or throwing an
// exception if the status indicates an error
return validate_ack(rx_ctrl, resp_status);
}
}
// If we got here, that means we iterated the queue and did NOT
// find a matching response to the request that this thread sent.
// We can't return, because if we're in this function, the caller
// specifically requested to receive an ACK for their request,
// and we haven't received it yet. So we must wait until a new
// request is added to the queue and try again. If we reach the
// timeout timepoint before a new request is added, then throw the
// timeout exception.
} while (
_resp_ready_cond.wait_until(lock, timeout_time) != std::cv_status::timeout);
throw uhd::op_timeout("Control operation timed out waiting for ACK");
}
const ctrl_payload validate_ack(
const ctrl_payload& rx_ctrl, response_status_t resp_status) const
{
if (rx_ctrl.status == CMD_CMDERR) {
throw uhd::op_failed("Control operation returned a failing status");
} else if (rx_ctrl.status == CMD_TSERR) {
throw uhd::op_timerr("Control operation returned a timestamp error");
}
// Check data vector size
if (rx_ctrl.data_vtr.empty()) {
throw uhd::op_failed("Control operation returned a malformed response");
}
// Validate response status
if (resp_status == RESP_DROPPED) {
throw uhd::op_seqerr("Response for a control transaction was dropped");
} else if (resp_status == RESP_RTERR) {
throw uhd::op_timerr("Control operation encountered a routing error");
}
return rx_ctrl;
}
void log_response_packet_error(const ctrl_payload& resp)
{
std::string packet = resp.to_string();
packet.pop_back(); // Remove the trailing \n
UHD_LOG_DEBUG("CTRLEP",
"Control response for ack-less request returned a failing status: "
<< packet);
}
void log_dropped_packet(const ctrl_payload& resp)
{
std::string packet = resp.to_string();
packet.pop_back(); // Remove the trailing \n
UHD_LOG_DEBUG(
"CTRLEP", "Control response for ack-less request was dropped: " << packet);
}
//! The parameters associated with the policy that governs this object
struct policy_args
{
double timeout = DEFAULT_TIMEOUT;
bool force_acks = DEFAULT_FORCE_ACKS;
};
//! Function to call to send a control packet
const send_fn_t _handle_send;
//! The endpoint ID of this software endpoint
const sep_id_t _my_epid;
//! The local port number on the control crossbar for this ctrlport endpoint
const uint16_t _local_port;
//! The downstream buffer capacity in 32-bit words (used for flow control)
const size_t _buff_capacity;
//! The max number of outstanding async messages that a block can have at any time
const size_t _max_outstanding_async_msgs;
//! The clock that drives the ctrlport endpoint
const clock_iface& _client_clk;
//! The clock that drives the timing logic for the ctrlport endpoint
const clock_iface& _timebase_clk;
//! The function to call to validate an async message (by default, all async
// messages are considered valid)
async_msg_validator_t _validate_async_msg =
[](uint32_t, const std::vector<uint32_t>&) { return true; };
//! The function to call to handle an async message
async_msg_callback_t _handle_async_msg = async_msg_callback_t();
//! The current control sequence number of outgoing packets
uint8_t _tx_seq_num = 0;
//! The number of occupied words in the downstream buffer
ssize_t _buff_occupied = 0;
//! The arguments for the policy that governs this register interface
policy_args _policy;
//! A condition variable that hold the "downstream buffer is free" condition
std::condition_variable _buff_free_cond;
//! A queue that holds all outstanding requests
std::deque<ctrl_payload> _req_queue;
//! A queue that holds all outstanding responses and their status
std::deque<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
//! A condition variable that hold the "response is available" condition
std::condition_variable _resp_ready_cond;
//! A mutex to protect all state in this class
std::mutex _mutex;
//! A set of {opcode, address, sequence numbers} triples associated with
// request packets for which the client cares about receiving ACKs
using wanted_ack_key = std::tuple<uint8_t, ctrl_opcode_t, uint32_t>;
std::set<wanted_ack_key> _wanted_acks;
};
ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send,
sep_id_t this_epid,
uint16_t local_port,
size_t buff_capacity,
size_t max_outstanding_async_msgs,
const clock_iface& client_clk,
const clock_iface& timebase_clk)
{
return std::make_shared<ctrlport_endpoint_impl>(handle_send,
this_epid,
local_port,
buff_capacity,
max_outstanding_async_msgs,
client_clk,
timebase_clk);
}
|