From 7b71a9542c28575b61f01e58209a48aefaa143c9 Mon Sep 17 00:00:00 2001 From: Ciro Nishiguchi Date: Sat, 17 Oct 2020 14:28:34 -0500 Subject: rfnoc: Fix thread unsafe accesses in ctrlport --- host/lib/rfnoc/ctrlport_endpoint.cpp | 91 ++++++++++++++---------------------- 1 file changed, 35 insertions(+), 56 deletions(-) (limited to 'host') diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp index 21a07b0d6..de5fa00b7 100644 --- a/host/lib/rfnoc/ctrlport_endpoint.cpp +++ b/host/lib/rfnoc/ctrlport_endpoint.cpp @@ -64,14 +64,11 @@ public: uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, bool ack = false) { - // Compute transaction expiration time - auto timeout_time = start_timeout(_policy.timeout); // Send request - auto request = - send_request_packet(OP_WRITE, addr, {data}, timestamp, timeout_time); + auto request = send_request_packet(OP_WRITE, addr, {data}, timestamp); // Optionally wait for an ACK if (ack || _policy.force_acks) { - wait_for_ack(request, timeout_time); + wait_for_ack(request); } } @@ -104,14 +101,11 @@ public: } /* TODO: Uncomment when the atomic block poke is implemented in the FPGA - // Compute transaction expiration time - auto timeout_time = start_timeout(_policy.timeout); // Send request - auto request = send_request_packet( - OP_BLOCK_WRITE, first_addr, data, timestamp, timeout_time); + auto request = send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp); // Optionally wait for an ACK if (ack || _policy.force_acks) { - wait_for_ack(request, timeout_time); + wait_for_ack(request); } */ } @@ -119,16 +113,11 @@ public: virtual uint32_t peek32( uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) { - // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed - // command is in the queue - auto timeout_time = - start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout); - // Send request - auto request = - send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp, timeout_time); + auto request = send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp); + // Wait for an ACK - auto response = wait_for_ack(request, timeout_time); + auto response = wait_for_ack(request); return response.data_vtr[0]; } @@ -144,19 +133,14 @@ public: return values; /* TODO: Uncomment when the atomic block peek is implemented in the FPGA - // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed - // command is in the queue - auto timeout_time = start_timeout( - check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout - ); // Send request auto request = send_request_packet(OP_READ, first_addr, std::vector(length, 0), - timestamp, - timeout_time); + timestamp); + // Wait for an ACK - auto response = wait_for_ack(request, timeout_time); + auto response = wait_for_ack(request); return response.data_vtr; */ } @@ -171,39 +155,29 @@ public: // TODO: Uncomment when this is implemented in the FPGA throw uhd::not_implemented_error("Control poll not implemented in the FPGA"); - // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed - // command is in the queue - auto timeout_time = - start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout); - // Send request auto request = send_request_packet(OP_POLL, addr, {data, mask, static_cast(timeout.to_ticks(_client_clk.get_freq()))}, - timestamp, - timeout_time); + timestamp); + // Optionally wait for an ACK if (ack || _policy.force_acks) { - wait_for_ack(request, timeout_time); + wait_for_ack(request); } } virtual void sleep(uhd::time_spec_t duration, bool ack = false) { - // Compute transaction expiration time, use MASSIVE_TIMEOUT if a timed - // command is in the queue - auto timeout_time = - start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout); - // Send request auto request = send_request_packet(OP_SLEEP, 0, {static_cast(duration.to_ticks(_client_clk.get_freq()))}, - uhd::time_spec_t::ASAP, - timeout_time); + uhd::time_spec_t::ASAP); + // Optionally wait for an ACK if (ack || _policy.force_acks) { - wait_for_ack(request, timeout_time); + wait_for_ack(request); } } @@ -236,7 +210,6 @@ public: if (rx_ctrl.is_ack) { // Function to process a response with no sequence errors auto process_correct_response = [this, rx_ctrl]() { - std::unique_lock lock(_mutex); response_status_t resp_status = RESP_VALID; // Grant flow control credits _buff_occupied -= get_payload_size(_req_queue.front()); @@ -252,7 +225,6 @@ public: }; // Function to process a response with sequence errors auto process_incorrect_response = [this]() { - std::unique_lock lock(_mutex); // Grant flow control credits _buff_occupied -= get_payload_size(_req_queue.front()); _buff_free_cond.notify_one(); @@ -266,6 +238,7 @@ public: }; // Peek at the request queue to check the expected sequence number + std::unique_lock lock(_mutex); int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num); if (seq_num_diff == 0) { // No sequence error process_correct_response(); @@ -365,7 +338,7 @@ private: //! Returns whether or not we have a timed command queued bool check_timed_in_queue() const { - for (auto pyld : _req_queue) { + for (const auto& pyld : _req_queue) { if (pyld.has_timestamp()) { return true; } @@ -377,8 +350,7 @@ private: const ctrl_payload send_request_packet(ctrl_opcode_t op_code, uint32_t address, const std::vector& data_vtr, - const uhd::time_spec_t& time_spec, - const steady_clock::time_point& timeout_time) + const uhd::time_spec_t& time_spec) { if (!_client_clk.is_running()) { throw uhd::system_error("Ctrlport client clock is not running"); @@ -393,6 +365,8 @@ private: timestamp = time_spec.to_ticks(_timebase_clk.get_freq()); } + std::unique_lock lock(_mutex); + // Assemble the control payload ctrl_payload tx_ctrl; tx_ctrl.dst_port = _local_port; @@ -407,8 +381,6 @@ private: tx_ctrl.op_code = op_code; tx_ctrl.status = CMD_OKAY; - std::unique_lock lock(_mutex); - // Perform flow control // If there is no room in the downstream buffer, then wait until the timeout size_t pyld_size = get_payload_size(tx_ctrl); @@ -420,11 +392,13 @@ private: - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs)); }; if (!buff_not_full()) { - // If we're sending a timed command or if we have a timed command in the - // queue, use the MASSIVE_TIMEOUT instead - auto timed_timeout = - (check_timed_in_queue() ? start_timeout(MASSIVE_TIMEOUT) : timeout_time); - if (not _buff_free_cond.wait_until(lock, timed_timeout, buff_not_full)) { + // If there is a timed command in the queue, use the + // MASSIVE_TIMEOUT instead + auto timeout_time = start_timeout(check_timed_in_queue() ? + MASSIVE_TIMEOUT : + _policy.timeout); + + if (not _buff_free_cond.wait_until(lock, timeout_time, buff_not_full)) { throw uhd::op_timeout( "Control operation timed out waiting for space in command buffer"); } @@ -440,14 +414,19 @@ private: } //! Waits for and returns the ACK for the specified request - const ctrl_payload wait_for_ack( - const ctrl_payload& request, const steady_clock::time_point& timeout_time) + const ctrl_payload wait_for_ack(const ctrl_payload& request) { auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); }; while (true) { std::unique_lock lock(_mutex); // Wait until there is a response in the response queue if (!resp_ready()) { + // If we're waiting for a timed command or if we have a + // command in the queue, use the MASSIVE_TIMEOUT instead + auto timeout_time = start_timeout(check_timed_in_queue() ? + MASSIVE_TIMEOUT : + _policy.timeout); + if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) { throw uhd::op_timeout("Control operation timed out waiting for ACK"); } -- cgit v1.2.3