aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
authorCiro Nishiguchi <ciro.nishiguchi@ni.com>2020-10-17 14:28:34 -0500
committerAaron Rossetto <aaron.rossetto@ni.com>2020-10-20 14:58:52 -0500
commit7b71a9542c28575b61f01e58209a48aefaa143c9 (patch)
tree8584b0abb4de9bac1b3e0c239e022c4ec5f841e0 /host/lib
parent7ee106187e1d0991e7ae79abd905a79dc5760b5f (diff)
downloaduhd-7b71a9542c28575b61f01e58209a48aefaa143c9.tar.gz
uhd-7b71a9542c28575b61f01e58209a48aefaa143c9.tar.bz2
uhd-7b71a9542c28575b61f01e58209a48aefaa143c9.zip
rfnoc: Fix thread unsafe accesses in ctrlport
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/rfnoc/ctrlport_endpoint.cpp91
1 files changed, 35 insertions, 56 deletions
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp
index 21a07b0d6..de5fa00b7 100644
--- a/host/lib/rfnoc/ctrlport_endpoint.cpp
+++ b/host/lib/rfnoc/ctrlport_endpoint.cpp
@@ -64,14 +64,11 @@ public:
uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
bool ack = false)
{
- // Compute transaction expiration time
- auto timeout_time = start_timeout(_policy.timeout);
// Send request
- auto request =
- send_request_packet(OP_WRITE, addr, {data}, timestamp, timeout_time);
+ auto request = send_request_packet(OP_WRITE, addr, {data}, timestamp);
// Optionally wait for an ACK
if (ack || _policy.force_acks) {
- wait_for_ack(request, timeout_time);
+ wait_for_ack(request);
}
}
@@ -104,14 +101,11 @@ public:
}
/* TODO: Uncomment when the atomic block poke is implemented in the FPGA
- // Compute transaction expiration time
- auto timeout_time = start_timeout(_policy.timeout);
// Send request
- auto request = send_request_packet(
- OP_BLOCK_WRITE, first_addr, data, timestamp, timeout_time);
+ auto request = send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp);
// Optionally wait for an ACK
if (ack || _policy.force_acks) {
- wait_for_ack(request, timeout_time);
+ wait_for_ack(request);
}
*/
}
@@ -119,16 +113,11 @@ public:
virtual uint32_t peek32(
uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
{
- // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
- // command is in the queue
- auto timeout_time =
- start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
-
// Send request
- auto request =
- send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp, timeout_time);
+ auto request = send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp);
+
// Wait for an ACK
- auto response = wait_for_ack(request, timeout_time);
+ auto response = wait_for_ack(request);
return response.data_vtr[0];
}
@@ -144,19 +133,14 @@ public:
return values;
/* TODO: Uncomment when the atomic block peek is implemented in the FPGA
- // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
- // command is in the queue
- auto timeout_time = start_timeout(
- check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout
- );
// Send request
auto request = send_request_packet(OP_READ,
first_addr,
std::vector<uint32_t>(length, 0),
- timestamp,
- timeout_time);
+ timestamp);
+
// Wait for an ACK
- auto response = wait_for_ack(request, timeout_time);
+ auto response = wait_for_ack(request);
return response.data_vtr;
*/
}
@@ -171,39 +155,29 @@ public:
// TODO: Uncomment when this is implemented in the FPGA
throw uhd::not_implemented_error("Control poll not implemented in the FPGA");
- // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
- // command is in the queue
- auto timeout_time =
- start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
-
// Send request
auto request = send_request_packet(OP_POLL,
addr,
{data, mask, static_cast<uint32_t>(timeout.to_ticks(_client_clk.get_freq()))},
- timestamp,
- timeout_time);
+ timestamp);
+
// Optionally wait for an ACK
if (ack || _policy.force_acks) {
- wait_for_ack(request, timeout_time);
+ wait_for_ack(request);
}
}
virtual void sleep(uhd::time_spec_t duration, bool ack = false)
{
- // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed
- // command is in the queue
- auto timeout_time =
- start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout);
-
// Send request
auto request = send_request_packet(OP_SLEEP,
0,
{static_cast<uint32_t>(duration.to_ticks(_client_clk.get_freq()))},
- uhd::time_spec_t::ASAP,
- timeout_time);
+ uhd::time_spec_t::ASAP);
+
// Optionally wait for an ACK
if (ack || _policy.force_acks) {
- wait_for_ack(request, timeout_time);
+ wait_for_ack(request);
}
}
@@ -236,7 +210,6 @@ public:
if (rx_ctrl.is_ack) {
// Function to process a response with no sequence errors
auto process_correct_response = [this, rx_ctrl]() {
- std::unique_lock<std::mutex> lock(_mutex);
response_status_t resp_status = RESP_VALID;
// Grant flow control credits
_buff_occupied -= get_payload_size(_req_queue.front());
@@ -252,7 +225,6 @@ public:
};
// Function to process a response with sequence errors
auto process_incorrect_response = [this]() {
- std::unique_lock<std::mutex> lock(_mutex);
// Grant flow control credits
_buff_occupied -= get_payload_size(_req_queue.front());
_buff_free_cond.notify_one();
@@ -266,6 +238,7 @@ public:
};
// Peek at the request queue to check the expected sequence number
+ std::unique_lock<std::mutex> lock(_mutex);
int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num);
if (seq_num_diff == 0) { // No sequence error
process_correct_response();
@@ -365,7 +338,7 @@ private:
//! Returns whether or not we have a timed command queued
bool check_timed_in_queue() const
{
- for (auto pyld : _req_queue) {
+ for (const auto& pyld : _req_queue) {
if (pyld.has_timestamp()) {
return true;
}
@@ -377,8 +350,7 @@ private:
const ctrl_payload send_request_packet(ctrl_opcode_t op_code,
uint32_t address,
const std::vector<uint32_t>& data_vtr,
- const uhd::time_spec_t& time_spec,
- const steady_clock::time_point& timeout_time)
+ const uhd::time_spec_t& time_spec)
{
if (!_client_clk.is_running()) {
throw uhd::system_error("Ctrlport client clock is not running");
@@ -393,6 +365,8 @@ private:
timestamp = time_spec.to_ticks(_timebase_clk.get_freq());
}
+ std::unique_lock<std::mutex> lock(_mutex);
+
// Assemble the control payload
ctrl_payload tx_ctrl;
tx_ctrl.dst_port = _local_port;
@@ -407,8 +381,6 @@ private:
tx_ctrl.op_code = op_code;
tx_ctrl.status = CMD_OKAY;
- std::unique_lock<std::mutex> lock(_mutex);
-
// Perform flow control
// If there is no room in the downstream buffer, then wait until the timeout
size_t pyld_size = get_payload_size(tx_ctrl);
@@ -420,11 +392,13 @@ private:
- (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
};
if (!buff_not_full()) {
- // If we're sending a timed command or if we have a timed command in the
- // queue, use the MASSIVE_TIMEOUT instead
- auto timed_timeout =
- (check_timed_in_queue() ? start_timeout(MASSIVE_TIMEOUT) : timeout_time);
- if (not _buff_free_cond.wait_until(lock, timed_timeout, buff_not_full)) {
+ // If there is a timed command in the queue, use the
+ // MASSIVE_TIMEOUT instead
+ auto timeout_time = start_timeout(check_timed_in_queue() ?
+ MASSIVE_TIMEOUT :
+ _policy.timeout);
+
+ if (not _buff_free_cond.wait_until(lock, timeout_time, buff_not_full)) {
throw uhd::op_timeout(
"Control operation timed out waiting for space in command buffer");
}
@@ -440,14 +414,19 @@ private:
}
//! Waits for and returns the ACK for the specified request
- const ctrl_payload wait_for_ack(
- const ctrl_payload& request, const steady_clock::time_point& timeout_time)
+ const ctrl_payload wait_for_ack(const ctrl_payload& request)
{
auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };
while (true) {
std::unique_lock<std::mutex> lock(_mutex);
// Wait until there is a response in the response queue
if (!resp_ready()) {
+ // If we're waiting for a timed command or if we have a
+ // command in the queue, use the MASSIVE_TIMEOUT instead
+ auto timeout_time = start_timeout(check_timed_in_queue() ?
+ MASSIVE_TIMEOUT :
+ _policy.timeout);
+
if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) {
throw uhd::op_timeout("Control operation timed out waiting for ACK");
}