aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/rfnoc/ctrlport_endpoint.cpp291
1 files changed, 170 insertions, 121 deletions
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp
index 3a569fae1..cad06f5e2 100644
--- a/host/lib/rfnoc/ctrlport_endpoint.cpp
+++ b/host/lib/rfnoc/ctrlport_endpoint.cpp
@@ -14,8 +14,7 @@
#include <deque>
#include <mutex>
#include <numeric>
-#include <queue>
-
+#include <set>
using namespace uhd;
using namespace uhd::rfnoc;
@@ -64,11 +63,8 @@ public:
uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
bool ack = false) override
{
- // Send request
- auto request = send_request_packet(OP_WRITE, addr, {data}, timestamp);
- // Optionally wait for an ACK
- const bool require_ack = ack || _policy.force_acks;
- wait_for_ack(request, require_ack);
+ // Send request and optionally wait for an ACK
+ send_request_packet(OP_WRITE, addr, {data}, timestamp, ack);
}
void multi_poke32(const std::vector<uint32_t> addrs,
@@ -100,23 +96,19 @@ public:
}
/* TODO: Uncomment when the atomic block poke is implemented in the FPGA
- // Send request
- auto request = send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp);
- // Optionally wait for an ACK
- const bool require_ack = ack || _policy.force_acks;
- wait_for_ack(request, require_ack);
+ // Send request and optionally want for an ACK
+ send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp, ack);
*/
}
uint32_t peek32(
uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) override
{
- // Send request
- auto request = send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp);
-
- // Wait for an ACK
- auto response = wait_for_ack(request, true).get();
- return response.data_vtr[0];
+ // Send request and wait for an ACK
+ boost::optional<ctrl_payload> response;
+ std::tie(std::ignore, response) =
+ send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp);
+ return response.get().data_vtr[0];
}
std::vector<uint32_t> block_peek32(uint32_t first_addr,
@@ -131,15 +123,14 @@ public:
return values;
/* TODO: Uncomment when the atomic block peek is implemented in the FPGA
- // Send request
- auto request = send_request_packet(OP_READ,
+ // Send request and wait for an ACK
+ boost::optional<ctrl_payload> response;
+ std::tie(std::ignore, response) = send_request_packet(OP_READ,
first_addr,
std::vector<uint32_t>(length, 0),
timestamp);
- // Wait for an ACK
- auto response = wait_for_ack(request, true).get();
- return response.data_vtr;
+ return response.get().data_vtr;
*/
}
@@ -153,30 +144,24 @@ public:
// TODO: Uncomment when this is implemented in the FPGA
throw uhd::not_implemented_error("Control poll not implemented in the FPGA");
- // Send request
- auto request = send_request_packet(OP_POLL,
+ // Send request and optionally wait for an ACK
+ send_request_packet(OP_POLL,
addr,
{data,
mask,
static_cast<uint32_t>(timeout.to_ticks(_timebase_clk.get_freq()))},
- timestamp);
-
- // Optionally wait for an ACK
- const bool require_ack = ack || _policy.force_acks;
- wait_for_ack(request, require_ack);
+ timestamp,
+ ack);
}
void sleep(uhd::time_spec_t duration, bool ack = false) override
{
- // Send request
- auto request = send_request_packet(OP_SLEEP,
+ // Send request and optionally wait for an ACK
+ send_request_packet(OP_SLEEP,
0,
{static_cast<uint32_t>(duration.to_ticks(_timebase_clk.get_freq()))},
- uhd::time_spec_t::ASAP);
-
- // Optionally wait for an ACK
- const bool require_ack = ack || _policy.force_acks;
- wait_for_ack(request, require_ack);
+ uhd::time_spec_t::ASAP,
+ ack);
}
void register_async_msg_validator(async_msg_validator_t callback_f) override
@@ -217,20 +202,44 @@ public:
}
// Pop the request from the queue
_req_queue.pop_front();
- // Push the response into the response queue
- _resp_queue.push(std::make_tuple(rx_ctrl, resp_status));
- _resp_ready_cond.notify_one();
+ // Push the response into the response queue if and only if
+ // the client wanted an ACK for this packet
+ wanted_ack_key request_key{
+ rx_ctrl.seq_num, rx_ctrl.op_code, rx_ctrl.address};
+ if (_wanted_acks.count(request_key)) {
+ _wanted_acks.erase(request_key);
+ _resp_queue.push_back(std::make_tuple(rx_ctrl, resp_status));
+ _resp_ready_cond.notify_all();
+ } else {
+ // If the client didn't want an ACK for this request, but
+ // the response indicated an error, then provide some
+ // feedback
+ if (rx_ctrl.status != CMD_OKAY) {
+ log_response_packet_error(rx_ctrl);
+ }
+ }
};
// Function to process a response with sequence errors
auto process_incorrect_response = [this]() {
// Grant flow control credits
_buff_occupied -= get_payload_size(_req_queue.front());
_buff_free_cond.notify_one();
- // Push a fabricated response into the response queue
+ // Push a fabricated response into the response queue only
+ // if the client was waiting on an ACK for the original
+ // request whose response was lost
ctrl_payload resp(_req_queue.front());
- resp.is_ack = true;
- _resp_queue.push(std::make_tuple(resp, RESP_DROPPED));
- _resp_ready_cond.notify_one();
+ wanted_ack_key request_key{resp.seq_num, resp.op_code, resp.address};
+ if (_wanted_acks.count(request_key)) {
+ _wanted_acks.erase(request_key);
+ resp.is_ack = true;
+ _resp_queue.push_back(std::make_tuple(resp, RESP_DROPPED));
+ _resp_ready_cond.notify_all();
+ } else {
+ // If the client did not want an ACK for this request, but
+ // we fabricated a response due to a sequence error, then
+ // provide feedback
+ log_dropped_packet(resp);
+ }
// Pop the request from the queue
_req_queue.pop_front();
};
@@ -321,6 +330,9 @@ public:
}
private:
+ //! The software status (different from the transaction status) of the response
+ enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
+
//! Returns the length of the control payload in 32-bit words
inline static size_t get_payload_size(const ctrl_payload& payload)
{
@@ -344,11 +356,14 @@ private:
return false;
}
- //! Sends a request control packet to a remote device
- const ctrl_payload send_request_packet(ctrl_opcode_t op_code,
+ //! Sends a request control packet to a remote device, optionally waiting
+ // for an ACK, and returns any response if applicable
+ const std::pair<ctrl_payload, boost::optional<ctrl_payload>> send_request_packet(
+ ctrl_opcode_t op_code,
uint32_t address,
const std::vector<uint32_t>& data_vtr,
- const uhd::time_spec_t& time_spec)
+ const uhd::time_spec_t& time_spec,
+ const bool require_ack = true)
{
if (!_client_clk.is_running()) {
throw uhd::system_error("Ctrlport client clock is not running");
@@ -387,7 +402,7 @@ private:
// If we can fit the current request in the queue then we can proceed
return (_buff_occupied + pyld_size)
<= (_buff_capacity
- - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
+ - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
};
if (!buff_not_full()) {
// If there is a timed command in the queue, use the
@@ -403,90 +418,122 @@ private:
_buff_occupied += pyld_size;
_req_queue.push_back(tx_ctrl);
- // Send the payload as soon as there is room in the buffer
- _handle_send(tx_ctrl, _policy.timeout);
- _tx_seq_num = (_tx_seq_num + 1) % 64;
+ if (require_ack || _policy.force_acks) {
+ // If the client wants an ACK for this request, make note of its
+ // details in a set. This set will be consulted when responses are
+ // received.
+ wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address};
+ _wanted_acks.insert(ack_key);
+ }
- return tx_ctrl;
+ try {
+ // Send the payload as soon as there is room in the buffer
+ _handle_send(tx_ctrl, _policy.timeout);
+ _tx_seq_num = (_tx_seq_num + 1) % 64;
+
+ if (require_ack || _policy.force_acks) {
+ auto response = wait_for_ack(tx_ctrl, lock);
+ return {tx_ctrl, response};
+ } else {
+ return {tx_ctrl, {}};
+ }
+ } catch (...) {
+ // Something went wrong while trying to send the request.
+ // Remove the entry from the ACK tracking set.
+ wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address};
+ _wanted_acks.erase(ack_key);
+ throw;
+ }
}
//! Waits for and returns the ACK for the specified request
- //
- // \param request The request for which we are awaiting the response
- // \param require_ack A Boolean flag which indicates if we really need that
- // response. If false, we reduce the timeout to zero
- // and return an empty control payload if we can't find
- // the relevant ACK. This can be used to help clear the
- // response queue without waiting.
- // \returns The response payload corresponding to \p requst. If \p require_ack
- // is false, this may also be an empty ctrl_payload object with no
- // meaningful content.
- // \throws uhd::op_timeout if require_ack is true, and we exceed the timeout
- // set by the current policy. Throws various other uhd::rfnoc_error
- // when there was a communication issue (see the code for details).
- const boost::optional<ctrl_payload> wait_for_ack(
- const ctrl_payload& request, const bool require_ack)
+ const ctrl_payload wait_for_ack(
+ const ctrl_payload& request, std::unique_lock<std::mutex>& lock)
{
- auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };
- while (true) {
- std::unique_lock<std::mutex> lock(_mutex);
- // Wait until there is a response in the response queue
- if (!resp_ready()) {
- if (!require_ack) {
- return {};
- }
- // If we're waiting for a timed command or if we have a
- // command in the queue, use the MASSIVE_TIMEOUT instead
- auto timeout_time = start_timeout(
- check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
-
- if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) {
- throw uhd::op_timeout("Control operation timed out waiting for ACK");
- }
- }
+ // Determine the timepoint (now plus some offset, depending on
+ // whether we're waiting for a timed command or if we have a
+ // command in the queue) at which point we flag a timeout.
+ const auto timeout_time =
+ start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
+
+ // Check the queue for the response for the specific request, looping
+ // until it's found or the timeout timepoint has been reached.
+ do {
// Extract the response packet
ctrl_payload rx_ctrl;
response_status_t resp_status;
- std::tie(rx_ctrl, resp_status) = _resp_queue.front();
- _resp_queue.pop();
- // Check if this is the response meant for the request
- // Filter by op_code, address and seq_num
- if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code
- && rx_ctrl.address == request.address) {
- // If we find the response for our request, and the client
- // has not requested acknowledgements, then simply return,
- // ignoring any error status in the response. This restores
- // the pre-0caed5529 behavior.
- if (!require_ack) {
- return {};
- }
- // Validate transaction status
- if (rx_ctrl.status == CMD_CMDERR) {
- throw uhd::op_failed("Control operation returned a failing status");
- } else if (rx_ctrl.status == CMD_TSERR) {
- throw uhd::op_timerr("Control operation returned a timestamp error");
- }
- // Check data vector size
- if (rx_ctrl.data_vtr.empty()) {
- throw uhd::op_failed(
- "Control operation returned a malformed response");
+ // Iterate the response queue, looking for the response that is
+ // meant for the request. When found, it is removed from the queue.
+ // Other elements are left undisturbed.
+ for (auto q_iterator = _resp_queue.begin(); q_iterator != _resp_queue.end();
+ q_iterator++) {
+ std::tie(rx_ctrl, resp_status) = *q_iterator;
+ // Check if this is the response meant for the request
+ // Filter by op_code, address and seq_num
+ if (rx_ctrl.seq_num == request.seq_num
+ && rx_ctrl.op_code == request.op_code
+ && rx_ctrl.address == request.address) {
+ // Remove the response from the queue
+ _resp_queue.erase(q_iterator);
+
+ // Validate transaction status, either returning the
+ // response if everything checks out, or throwing an
+ // exception if the status indicates an error
+ return validate_ack(rx_ctrl, resp_status);
}
- // Validate response status
- if (resp_status == RESP_DROPPED) {
- throw uhd::op_seqerr(
- "Response for a control transaction was dropped");
- } else if (resp_status == RESP_RTERR) {
- throw uhd::op_timerr("Control operation encountered a routing error");
- }
- return rx_ctrl;
- } else {
- // This response does not belong to the request we passed in. Move on.
- continue;
}
+ // If we got here, that means we iterated the queue and did NOT
+ // find a matching response to the request that this thread sent.
+ // We can't return, because if we're in this function, the caller
+ // specifically requested to receive an ACK for their request,
+ // and we haven't received it yet. So we must wait until a new
+ // request is added to the queue and try again. If we reach the
+ // timeout timepoint before a new request is added, then throw the
+ // timeout exception.
+ } while (
+ _resp_ready_cond.wait_until(lock, timeout_time) != std::cv_status::timeout);
+
+ throw uhd::op_timeout("Control operation timed out waiting for ACK");
+ }
+
+ const ctrl_payload validate_ack(
+ const ctrl_payload& rx_ctrl, response_status_t resp_status) const
+ {
+ if (rx_ctrl.status == CMD_CMDERR) {
+ throw uhd::op_failed("Control operation returned a failing status");
+ } else if (rx_ctrl.status == CMD_TSERR) {
+ throw uhd::op_timerr("Control operation returned a timestamp error");
}
+ // Check data vector size
+ if (rx_ctrl.data_vtr.empty()) {
+ throw uhd::op_failed("Control operation returned a malformed response");
+ }
+ // Validate response status
+ if (resp_status == RESP_DROPPED) {
+ throw uhd::op_seqerr("Response for a control transaction was dropped");
+ } else if (resp_status == RESP_RTERR) {
+ throw uhd::op_timerr("Control operation encountered a routing error");
+ }
+ return rx_ctrl;
}
+ void log_response_packet_error(const ctrl_payload& resp)
+ {
+ std::string packet = resp.to_string();
+ packet.pop_back(); // Remove the trailing \n
+ UHD_LOG_DEBUG("CTRLEP",
+ "Control response for ack-less request returned a failing status: "
+ << packet);
+ }
+
+ void log_dropped_packet(const ctrl_payload& resp)
+ {
+ std::string packet = resp.to_string();
+ packet.pop_back(); // Remove the trailing \n
+ UHD_LOG_DEBUG(
+ "CTRLEP", "Control response for ack-less request was dropped: " << packet);
+ }
//! The parameters associated with the policy that governs this object
struct policy_args
@@ -494,8 +541,6 @@ private:
double timeout = DEFAULT_TIMEOUT;
bool force_acks = DEFAULT_FORCE_ACKS;
};
- //! The software status (different from the transaction status) of the response
- enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
//! Function to call to send a control packet
const send_fn_t _handle_send;
@@ -529,11 +574,15 @@ private:
//! A queue that holds all outstanding requests
std::deque<ctrl_payload> _req_queue;
//! A queue that holds all outstanding responses and their status
- std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
+ std::deque<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
//! A condition variable that hold the "response is available" condition
std::condition_variable _resp_ready_cond;
//! A mutex to protect all state in this class
std::mutex _mutex;
+ //! A set of {opcode, address, sequence numbers} triples associated with
+ // request packets for which the client cares about receiving ACKs
+ using wanted_ack_key = std::tuple<uint8_t, ctrl_opcode_t, uint32_t>;
+ std::set<wanted_ack_key> _wanted_acks;
};
ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send,