diff options
-rw-r--r-- | host/lib/rfnoc/ctrlport_endpoint.cpp | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp index a618eaca2..267ff5d71 100644 --- a/host/lib/rfnoc/ctrlport_endpoint.cpp +++ b/host/lib/rfnoc/ctrlport_endpoint.cpp @@ -193,11 +193,13 @@ public: virtual void register_async_msg_handler(async_msg_callback_t callback_f) { + std::unique_lock<std::mutex> lock(_mutex); _handle_async_msg = callback_f; } virtual void set_policy(const std::string& name, const uhd::device_addr_t& args) { + std::unique_lock<std::mutex> lock(_mutex); if (name == "default") { _policy.timeout = args.cast<double>("timeout", DEFAULT_TIMEOUT); _policy.force_acks = DEFAULT_FORCE_ACKS; @@ -209,10 +211,10 @@ public: virtual void handle_recv(const ctrl_payload& rx_ctrl) { - std::unique_lock<std::mutex> lock(_mutex); if (rx_ctrl.is_ack) { // Function to process a response with no sequence errors auto process_correct_response = [this, rx_ctrl]() { + std::unique_lock<std::mutex> lock(_mutex); response_status_t resp_status = RESP_VALID; // Grant flow control credits _buff_occupied -= get_payload_size(_req_queue.front()); @@ -228,6 +230,7 @@ public: }; // Function to process a response with sequence errors auto process_incorrect_response = [this]() { + std::unique_lock<std::mutex> lock(_mutex); // Grant flow control credits _buff_occupied -= get_payload_size(_req_queue.front()); _buff_free_cond.notify_one(); @@ -286,7 +289,11 @@ public: tx_ctrl.is_ack = true; tx_ctrl.src_epid = _my_epid; tx_ctrl.status = status; - _handle_send(tx_ctrl, _policy.timeout); + const auto timeout = [&]() { + std::unique_lock<std::mutex> lock(_mutex); + return _policy.timeout; + }(); + _handle_send(tx_ctrl, timeout); } catch (...) { UHD_LOG_ERROR("CTRLEP", "Encountered an error sending a response for an async message"); @@ -296,11 +303,13 @@ public: virtual uint16_t get_src_epid() const { + // Is const, does not require a mutex return _my_epid; } virtual uint16_t get_port_num() const { + // Is const, does not require a mutex return _local_port; } @@ -324,7 +333,6 @@ private: const uhd::time_spec_t& time_spec, const steady_clock::time_point& timeout_time) { - std::unique_lock<std::mutex> lock(_mutex); if (!_client_clk.is_running()) { throw uhd::system_error("Ctrlport client clock is not running"); @@ -353,6 +361,8 @@ private: tx_ctrl.op_code = op_code; tx_ctrl.status = CMD_OKAY; + std::unique_lock<std::mutex> lock(_mutex); + // Perform flow control // If there is no room in the downstream buffer, then wait until the timeout size_t pyld_size = get_payload_size(tx_ctrl); @@ -383,10 +393,9 @@ private: const ctrl_payload wait_for_ack( const ctrl_payload& request, const steady_clock::time_point& timeout_time) { - std::unique_lock<std::mutex> lock(_mutex); - auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); }; while (true) { + std::unique_lock<std::mutex> lock(_mutex); // Wait until there is a response in the response queue if (!resp_ready()) { if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) { |