diff options
author | Martin Braun <martin.braun@ettus.com> | 2019-07-17 15:27:18 -0700 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:30 -0800 |
commit | 080f406977cdd91ab8d7998cf79fc2be6339f8c7 (patch) | |
tree | bbe38b9fb8d01234a3503a9c3cc62e903cb2af4a | |
parent | c8b9a07c8d34123cb4e286decc75b875d675e98c (diff) | |
download | uhd-080f406977cdd91ab8d7998cf79fc2be6339f8c7.tar.gz uhd-080f406977cdd91ab8d7998cf79fc2be6339f8c7.tar.bz2 uhd-080f406977cdd91ab8d7998cf79fc2be6339f8c7.zip |
rfnoc: ctrlport_endpoint: Audit locks, lock more selectively
The existing implementation would lock judiciously, causing a deadlock
when the async message handler would try and call poke32().
-rw-r--r-- | host/lib/rfnoc/ctrlport_endpoint.cpp | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp index a618eaca2..267ff5d71 100644 --- a/host/lib/rfnoc/ctrlport_endpoint.cpp +++ b/host/lib/rfnoc/ctrlport_endpoint.cpp @@ -193,11 +193,13 @@ public: virtual void register_async_msg_handler(async_msg_callback_t callback_f) { + std::unique_lock<std::mutex> lock(_mutex); _handle_async_msg = callback_f; } virtual void set_policy(const std::string& name, const uhd::device_addr_t& args) { + std::unique_lock<std::mutex> lock(_mutex); if (name == "default") { _policy.timeout = args.cast<double>("timeout", DEFAULT_TIMEOUT); _policy.force_acks = DEFAULT_FORCE_ACKS; @@ -209,10 +211,10 @@ public: virtual void handle_recv(const ctrl_payload& rx_ctrl) { - std::unique_lock<std::mutex> lock(_mutex); if (rx_ctrl.is_ack) { // Function to process a response with no sequence errors auto process_correct_response = [this, rx_ctrl]() { + std::unique_lock<std::mutex> lock(_mutex); response_status_t resp_status = RESP_VALID; // Grant flow control credits _buff_occupied -= get_payload_size(_req_queue.front()); @@ -228,6 +230,7 @@ public: }; // Function to process a response with sequence errors auto process_incorrect_response = [this]() { + std::unique_lock<std::mutex> lock(_mutex); // Grant flow control credits _buff_occupied -= get_payload_size(_req_queue.front()); _buff_free_cond.notify_one(); @@ -286,7 +289,11 @@ public: tx_ctrl.is_ack = true; tx_ctrl.src_epid = _my_epid; tx_ctrl.status = status; - _handle_send(tx_ctrl, _policy.timeout); + const auto timeout = [&]() { + std::unique_lock<std::mutex> lock(_mutex); + return _policy.timeout; + }(); + _handle_send(tx_ctrl, timeout); } catch (...) { UHD_LOG_ERROR("CTRLEP", "Encountered an error sending a response for an async message"); @@ -296,11 +303,13 @@ public: virtual uint16_t get_src_epid() const { + // Is const, does not require a mutex return _my_epid; } virtual uint16_t get_port_num() const { + // Is const, does not require a mutex return _local_port; } @@ -324,7 +333,6 @@ private: const uhd::time_spec_t& time_spec, const steady_clock::time_point& timeout_time) { - std::unique_lock<std::mutex> lock(_mutex); if (!_client_clk.is_running()) { throw uhd::system_error("Ctrlport client clock is not running"); @@ -353,6 +361,8 @@ private: tx_ctrl.op_code = op_code; tx_ctrl.status = CMD_OKAY; + std::unique_lock<std::mutex> lock(_mutex); + // Perform flow control // If there is no room in the downstream buffer, then wait until the timeout size_t pyld_size = get_payload_size(tx_ctrl); @@ -383,10 +393,9 @@ private: const ctrl_payload wait_for_ack( const ctrl_payload& request, const steady_clock::time_point& timeout_time) { - std::unique_lock<std::mutex> lock(_mutex); - auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); }; while (true) { + std::unique_lock<std::mutex> lock(_mutex); // Wait until there is a response in the response queue if (!resp_ready()) { if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) { |