aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/ctrlport_endpoint.cpp
diff options
context:
space:
mode:
authorAaron Rossetto <aaron.rossetto@ni.com>2022-02-17 12:15:21 -0600
committerAaron Rossetto <aaron.rossetto@ni.com>2022-03-03 14:09:27 -0600
commit8601d225d52b5d5d3876f633bf5d3a97ff475bda (patch)
treed4c47b15075324c21690514fd3a7c02bcf6ddb72 /host/lib/rfnoc/ctrlport_endpoint.cpp
parent7599789f83bcf9f9e9bdef3432c37e460bddc1e4 (diff)
downloaduhd-8601d225d52b5d5d3876f633bf5d3a97ff475bda.tar.gz
uhd-8601d225d52b5d5d3876f633bf5d3a97ff475bda.tar.bz2
uhd-8601d225d52b5d5d3876f633bf5d3a97ff475bda.zip
rfnoc: Refactor ctrlport_endpoint; fix MT issues
This commit refactors ctrlport_endpoint and fixes several issues related to multiple threads sending and receiving control transfers. First, it refactors the change that Martin Braun implemented in 0caed5529 by adding a tracking mechanism for control requests where clients have explicitly asked to receive an ACK when the corresponding control response is received. When a client wants to wait for an ACK associated with a control request, a combination of that request's opcode, address, and sequence number is added to a set when the request is sent. When a control response is received, the set is consulted to see if the corresponding request is there by matching the packet field data listed above. If so, the control response is added to the response queue, thus notifying all threads waiting in `wait_for_ack()` that there is a response that the thread may be waiting on. If the request is not in the set, the request is never added to the response queue. This prevents the initial problem that 0caed5529 was addressing of the response queue growing infinitely large with control responses that would never be popped from the queue. Secondly, it addresses issues when multiple threads have sent a request packet and are waiting in `wait_for_ack()` on the corresponding response. Originally, the function contained a loop which would sleep the calling thread until the control response queue had at least one element in it. When awakened, the thread would pop the frontmost control response off the queue to see if it matches the corresponding control request (i.e., has the same sequence number, opcode, and address elements). If so, the response would be handled appropriately, which may include signalling an error if the response indicates an exceptional status, and the function would return. If the response is not a matching one, the function would return to the top of the loop. If the corresponding response is not found within a specified period, the function would throw an op_timeout exception. However, there is a subtle issue with this algorithm when two different calling threads submit control requests and end up calling `wait_for_ack()` nearly simultaneously. Consider two threads issuing a control request. Thread T1 issues a request with sequence number 1 and thread T2 issues a request with sequence number 2. The two threads then call `wait_for_ack()`. Let's assume that neither of the control reponses have arrived yet. Both threads sleep, waiting to be notified of a response. Now the response for sequence number 1 arrives and is pushed to the front the response queue. This generates a signal that awakes one of the waiting threads, but which one is awakened is completely at the mercy of the scheduler. If T1 is awakened first, it pops the response from the queue, finds that it matches the request, and handles it as expected. Later, when the reponse for sequence number 2 is pushed onto the queue, the still-sleeping T2 will be awakened. It pops the response, finds it to be matching, and all is well. But if the scheduler decides to wake T2 first, T2 ends up popping the response with sequence number 1 off the front of the queue, but it doesn't match the request that T2 sent with sequence number 2, so T2 goes back to the top of the loop. At this point, it doesn't matter if T2 or T1 is awakened next; because the control response for sequence number 1 was already popped off the queue, T1 never sees the control response it expects, and will throw uhd::op_timeout back up the stack. This commit modifies the `wait_for_ack()` algorithm to search the queue for a matching response rather than indiscriminately popping the frontmost element from the queue and throwing it away if it doesn't match. That way, the order in which threads are awakened no longer matters as they will be able to find the corresponding response regardless. Furthermore, when a response is pushed onto the response queue, all waiting threads are notified of the condition via `notify_all()`, rather than just waking one thread at random (`notify_one()`). This gives all waiting threads the opportunity to check the queue for a response. Finally, the `wait_for_ack()` loop has been modified such that the thread waits to be signalled regardless of whether the queue has elements in it or not. (Prior to this change, the thread would only wait to be signalled if the queue was empty.) This effectively implements the behavior that all threads are awakened when a new control response is pushed into the queue, and combined with the changes above, ensures that all threads get a chance to react and check the queue when the queue is modified.
Diffstat (limited to 'host/lib/rfnoc/ctrlport_endpoint.cpp')
-rw-r--r--host/lib/rfnoc/ctrlport_endpoint.cpp291
1 files changed, 170 insertions, 121 deletions
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp
index 3a569fae1..cad06f5e2 100644
--- a/host/lib/rfnoc/ctrlport_endpoint.cpp
+++ b/host/lib/rfnoc/ctrlport_endpoint.cpp
@@ -14,8 +14,7 @@
#include <deque>
#include <mutex>
#include <numeric>
-#include <queue>
-
+#include <set>
using namespace uhd;
using namespace uhd::rfnoc;
@@ -64,11 +63,8 @@ public:
uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
bool ack = false) override
{
- // Send request
- auto request = send_request_packet(OP_WRITE, addr, {data}, timestamp);
- // Optionally wait for an ACK
- const bool require_ack = ack || _policy.force_acks;
- wait_for_ack(request, require_ack);
+ // Send request and optionally wait for an ACK
+ send_request_packet(OP_WRITE, addr, {data}, timestamp, ack);
}
void multi_poke32(const std::vector<uint32_t> addrs,
@@ -100,23 +96,19 @@ public:
}
/* TODO: Uncomment when the atomic block poke is implemented in the FPGA
- // Send request
- auto request = send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp);
- // Optionally wait for an ACK
- const bool require_ack = ack || _policy.force_acks;
- wait_for_ack(request, require_ack);
+ // Send request and optionally want for an ACK
+ send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp, ack);
*/
}
uint32_t peek32(
uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) override
{
- // Send request
- auto request = send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp);
-
- // Wait for an ACK
- auto response = wait_for_ack(request, true).get();
- return response.data_vtr[0];
+ // Send request and wait for an ACK
+ boost::optional<ctrl_payload> response;
+ std::tie(std::ignore, response) =
+ send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp);
+ return response.get().data_vtr[0];
}
std::vector<uint32_t> block_peek32(uint32_t first_addr,
@@ -131,15 +123,14 @@ public:
return values;
/* TODO: Uncomment when the atomic block peek is implemented in the FPGA
- // Send request
- auto request = send_request_packet(OP_READ,
+ // Send request and wait for an ACK
+ boost::optional<ctrl_payload> response;
+ std::tie(std::ignore, response) = send_request_packet(OP_READ,
first_addr,
std::vector<uint32_t>(length, 0),
timestamp);
- // Wait for an ACK
- auto response = wait_for_ack(request, true).get();
- return response.data_vtr;
+ return response.get().data_vtr;
*/
}
@@ -153,30 +144,24 @@ public:
// TODO: Uncomment when this is implemented in the FPGA
throw uhd::not_implemented_error("Control poll not implemented in the FPGA");
- // Send request
- auto request = send_request_packet(OP_POLL,
+ // Send request and optionally wait for an ACK
+ send_request_packet(OP_POLL,
addr,
{data,
mask,
static_cast<uint32_t>(timeout.to_ticks(_timebase_clk.get_freq()))},
- timestamp);
-
- // Optionally wait for an ACK
- const bool require_ack = ack || _policy.force_acks;
- wait_for_ack(request, require_ack);
+ timestamp,
+ ack);
}
void sleep(uhd::time_spec_t duration, bool ack = false) override
{
- // Send request
- auto request = send_request_packet(OP_SLEEP,
+ // Send request and optionally wait for an ACK
+ send_request_packet(OP_SLEEP,
0,
{static_cast<uint32_t>(duration.to_ticks(_timebase_clk.get_freq()))},
- uhd::time_spec_t::ASAP);
-
- // Optionally wait for an ACK
- const bool require_ack = ack || _policy.force_acks;
- wait_for_ack(request, require_ack);
+ uhd::time_spec_t::ASAP,
+ ack);
}
void register_async_msg_validator(async_msg_validator_t callback_f) override
@@ -217,20 +202,44 @@ public:
}
// Pop the request from the queue
_req_queue.pop_front();
- // Push the response into the response queue
- _resp_queue.push(std::make_tuple(rx_ctrl, resp_status));
- _resp_ready_cond.notify_one();
+ // Push the response into the response queue if and only if
+ // the client wanted an ACK for this packet
+ wanted_ack_key request_key{
+ rx_ctrl.seq_num, rx_ctrl.op_code, rx_ctrl.address};
+ if (_wanted_acks.count(request_key)) {
+ _wanted_acks.erase(request_key);
+ _resp_queue.push_back(std::make_tuple(rx_ctrl, resp_status));
+ _resp_ready_cond.notify_all();
+ } else {
+ // If the client didn't want an ACK for this request, but
+ // the response indicated an error, then provide some
+ // feedback
+ if (rx_ctrl.status != CMD_OKAY) {
+ log_response_packet_error(rx_ctrl);
+ }
+ }
};
// Function to process a response with sequence errors
auto process_incorrect_response = [this]() {
// Grant flow control credits
_buff_occupied -= get_payload_size(_req_queue.front());
_buff_free_cond.notify_one();
- // Push a fabricated response into the response queue
+ // Push a fabricated response into the response queue only
+ // if the client was waiting on an ACK for the original
+ // request whose response was lost
ctrl_payload resp(_req_queue.front());
- resp.is_ack = true;
- _resp_queue.push(std::make_tuple(resp, RESP_DROPPED));
- _resp_ready_cond.notify_one();
+ wanted_ack_key request_key{resp.seq_num, resp.op_code, resp.address};
+ if (_wanted_acks.count(request_key)) {
+ _wanted_acks.erase(request_key);
+ resp.is_ack = true;
+ _resp_queue.push_back(std::make_tuple(resp, RESP_DROPPED));
+ _resp_ready_cond.notify_all();
+ } else {
+ // If the client did not want an ACK for this request, but
+ // we fabricated a response due to a sequence error, then
+ // provide feedback
+ log_dropped_packet(resp);
+ }
// Pop the request from the queue
_req_queue.pop_front();
};
@@ -321,6 +330,9 @@ public:
}
private:
+ //! The software status (different from the transaction status) of the response
+ enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
+
//! Returns the length of the control payload in 32-bit words
inline static size_t get_payload_size(const ctrl_payload& payload)
{
@@ -344,11 +356,14 @@ private:
return false;
}
- //! Sends a request control packet to a remote device
- const ctrl_payload send_request_packet(ctrl_opcode_t op_code,
+ //! Sends a request control packet to a remote device, optionally waiting
+ // for an ACK, and returns any response if applicable
+ const std::pair<ctrl_payload, boost::optional<ctrl_payload>> send_request_packet(
+ ctrl_opcode_t op_code,
uint32_t address,
const std::vector<uint32_t>& data_vtr,
- const uhd::time_spec_t& time_spec)
+ const uhd::time_spec_t& time_spec,
+ const bool require_ack = true)
{
if (!_client_clk.is_running()) {
throw uhd::system_error("Ctrlport client clock is not running");
@@ -387,7 +402,7 @@ private:
// If we can fit the current request in the queue then we can proceed
return (_buff_occupied + pyld_size)
<= (_buff_capacity
- - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
+ - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
};
if (!buff_not_full()) {
// If there is a timed command in the queue, use the
@@ -403,90 +418,122 @@ private:
_buff_occupied += pyld_size;
_req_queue.push_back(tx_ctrl);
- // Send the payload as soon as there is room in the buffer
- _handle_send(tx_ctrl, _policy.timeout);
- _tx_seq_num = (_tx_seq_num + 1) % 64;
+ if (require_ack || _policy.force_acks) {
+ // If the client wants an ACK for this request, make note of its
+ // details in a set. This set will be consulted when responses are
+ // received.
+ wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address};
+ _wanted_acks.insert(ack_key);
+ }
- return tx_ctrl;
+ try {
+ // Send the payload as soon as there is room in the buffer
+ _handle_send(tx_ctrl, _policy.timeout);
+ _tx_seq_num = (_tx_seq_num + 1) % 64;
+
+ if (require_ack || _policy.force_acks) {
+ auto response = wait_for_ack(tx_ctrl, lock);
+ return {tx_ctrl, response};
+ } else {
+ return {tx_ctrl, {}};
+ }
+ } catch (...) {
+ // Something went wrong while trying to send the request.
+ // Remove the entry from the ACK tracking set.
+ wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address};
+ _wanted_acks.erase(ack_key);
+ throw;
+ }
}
//! Waits for and returns the ACK for the specified request
- //
- // \param request The request for which we are awaiting the response
- // \param require_ack A Boolean flag which indicates if we really need that
- // response. If false, we reduce the timeout to zero
- // and return an empty control payload if we can't find
- // the relevant ACK. This can be used to help clear the
- // response queue without waiting.
- // \returns The response payload corresponding to \p requst. If \p require_ack
- // is false, this may also be an empty ctrl_payload object with no
- // meaningful content.
- // \throws uhd::op_timeout if require_ack is true, and we exceed the timeout
- // set by the current policy. Throws various other uhd::rfnoc_error
- // when there was a communication issue (see the code for details).
- const boost::optional<ctrl_payload> wait_for_ack(
- const ctrl_payload& request, const bool require_ack)
+ const ctrl_payload wait_for_ack(
+ const ctrl_payload& request, std::unique_lock<std::mutex>& lock)
{
- auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };
- while (true) {
- std::unique_lock<std::mutex> lock(_mutex);
- // Wait until there is a response in the response queue
- if (!resp_ready()) {
- if (!require_ack) {
- return {};
- }
- // If we're waiting for a timed command or if we have a
- // command in the queue, use the MASSIVE_TIMEOUT instead
- auto timeout_time = start_timeout(
- check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
-
- if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) {
- throw uhd::op_timeout("Control operation timed out waiting for ACK");
- }
- }
+ // Determine the timepoint (now plus some offset, depending on
+ // whether we're waiting for a timed command or if we have a
+ // command in the queue) at which point we flag a timeout.
+ const auto timeout_time =
+ start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
+
+ // Check the queue for the response for the specific request, looping
+ // until it's found or the timeout timepoint has been reached.
+ do {
// Extract the response packet
ctrl_payload rx_ctrl;
response_status_t resp_status;
- std::tie(rx_ctrl, resp_status) = _resp_queue.front();
- _resp_queue.pop();
- // Check if this is the response meant for the request
- // Filter by op_code, address and seq_num
- if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code
- && rx_ctrl.address == request.address) {
- // If we find the response for our request, and the client
- // has not requested acknowledgements, then simply return,
- // ignoring any error status in the response. This restores
- // the pre-0caed5529 behavior.
- if (!require_ack) {
- return {};
- }
- // Validate transaction status
- if (rx_ctrl.status == CMD_CMDERR) {
- throw uhd::op_failed("Control operation returned a failing status");
- } else if (rx_ctrl.status == CMD_TSERR) {
- throw uhd::op_timerr("Control operation returned a timestamp error");
- }
- // Check data vector size
- if (rx_ctrl.data_vtr.empty()) {
- throw uhd::op_failed(
- "Control operation returned a malformed response");
+ // Iterate the response queue, looking for the response that is
+ // meant for the request. When found, it is removed from the queue.
+ // Other elements are left undisturbed.
+ for (auto q_iterator = _resp_queue.begin(); q_iterator != _resp_queue.end();
+ q_iterator++) {
+ std::tie(rx_ctrl, resp_status) = *q_iterator;
+ // Check if this is the response meant for the request
+ // Filter by op_code, address and seq_num
+ if (rx_ctrl.seq_num == request.seq_num
+ && rx_ctrl.op_code == request.op_code
+ && rx_ctrl.address == request.address) {
+ // Remove the response from the queue
+ _resp_queue.erase(q_iterator);
+
+ // Validate transaction status, either returning the
+ // response if everything checks out, or throwing an
+ // exception if the status indicates an error
+ return validate_ack(rx_ctrl, resp_status);
}
- // Validate response status
- if (resp_status == RESP_DROPPED) {
- throw uhd::op_seqerr(
- "Response for a control transaction was dropped");
- } else if (resp_status == RESP_RTERR) {
- throw uhd::op_timerr("Control operation encountered a routing error");
- }
- return rx_ctrl;
- } else {
- // This response does not belong to the request we passed in. Move on.
- continue;
}
+ // If we got here, that means we iterated the queue and did NOT
+ // find a matching response to the request that this thread sent.
+ // We can't return, because if we're in this function, the caller
+ // specifically requested to receive an ACK for their request,
+ // and we haven't received it yet. So we must wait until a new
+ // request is added to the queue and try again. If we reach the
+ // timeout timepoint before a new request is added, then throw the
+ // timeout exception.
+ } while (
+ _resp_ready_cond.wait_until(lock, timeout_time) != std::cv_status::timeout);
+
+ throw uhd::op_timeout("Control operation timed out waiting for ACK");
+ }
+
+ const ctrl_payload validate_ack(
+ const ctrl_payload& rx_ctrl, response_status_t resp_status) const
+ {
+ if (rx_ctrl.status == CMD_CMDERR) {
+ throw uhd::op_failed("Control operation returned a failing status");
+ } else if (rx_ctrl.status == CMD_TSERR) {
+ throw uhd::op_timerr("Control operation returned a timestamp error");
}
+ // Check data vector size
+ if (rx_ctrl.data_vtr.empty()) {
+ throw uhd::op_failed("Control operation returned a malformed response");
+ }
+ // Validate response status
+ if (resp_status == RESP_DROPPED) {
+ throw uhd::op_seqerr("Response for a control transaction was dropped");
+ } else if (resp_status == RESP_RTERR) {
+ throw uhd::op_timerr("Control operation encountered a routing error");
+ }
+ return rx_ctrl;
}
+ void log_response_packet_error(const ctrl_payload& resp)
+ {
+ std::string packet = resp.to_string();
+ packet.pop_back(); // Remove the trailing \n
+ UHD_LOG_DEBUG("CTRLEP",
+ "Control response for ack-less request returned a failing status: "
+ << packet);
+ }
+
+ void log_dropped_packet(const ctrl_payload& resp)
+ {
+ std::string packet = resp.to_string();
+ packet.pop_back(); // Remove the trailing \n
+ UHD_LOG_DEBUG(
+ "CTRLEP", "Control response for ack-less request was dropped: " << packet);
+ }
//! The parameters associated with the policy that governs this object
struct policy_args
@@ -494,8 +541,6 @@ private:
double timeout = DEFAULT_TIMEOUT;
bool force_acks = DEFAULT_FORCE_ACKS;
};
- //! The software status (different from the transaction status) of the response
- enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
//! Function to call to send a control packet
const send_fn_t _handle_send;
@@ -529,11 +574,15 @@ private:
//! A queue that holds all outstanding requests
std::deque<ctrl_payload> _req_queue;
//! A queue that holds all outstanding responses and their status
- std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
+ std::deque<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
//! A condition variable that hold the "response is available" condition
std::condition_variable _resp_ready_cond;
//! A mutex to protect all state in this class
std::mutex _mutex;
+ //! A set of {opcode, address, sequence numbers} triples associated with
+ // request packets for which the client cares about receiving ACKs
+ using wanted_ack_key = std::tuple<uint8_t, ctrl_opcode_t, uint32_t>;
+ std::set<wanted_ack_key> _wanted_acks;
};
ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send,