diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/lib/rfnoc/ctrlport_endpoint.cpp | 19 | 
1 files changed, 14 insertions, 5 deletions
| diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp index a618eaca2..267ff5d71 100644 --- a/host/lib/rfnoc/ctrlport_endpoint.cpp +++ b/host/lib/rfnoc/ctrlport_endpoint.cpp @@ -193,11 +193,13 @@ public:      virtual void register_async_msg_handler(async_msg_callback_t callback_f)      { +        std::unique_lock<std::mutex> lock(_mutex);          _handle_async_msg = callback_f;      }      virtual void set_policy(const std::string& name, const uhd::device_addr_t& args)      { +        std::unique_lock<std::mutex> lock(_mutex);          if (name == "default") {              _policy.timeout    = args.cast<double>("timeout", DEFAULT_TIMEOUT);              _policy.force_acks = DEFAULT_FORCE_ACKS; @@ -209,10 +211,10 @@ public:      virtual void handle_recv(const ctrl_payload& rx_ctrl)      { -        std::unique_lock<std::mutex> lock(_mutex);          if (rx_ctrl.is_ack) {              // Function to process a response with no sequence errors              auto process_correct_response = [this, rx_ctrl]() { +                std::unique_lock<std::mutex> lock(_mutex);                  response_status_t resp_status = RESP_VALID;                  // Grant flow control credits                  _buff_occupied -= get_payload_size(_req_queue.front()); @@ -228,6 +230,7 @@ public:              };              // Function to process a response with sequence errors              auto process_incorrect_response = [this]() { +                std::unique_lock<std::mutex> lock(_mutex);                  // Grant flow control credits                  _buff_occupied -= get_payload_size(_req_queue.front());                  _buff_free_cond.notify_one(); @@ -286,7 +289,11 @@ public:                  tx_ctrl.is_ack   = true;                  tx_ctrl.src_epid = _my_epid;                  tx_ctrl.status   = status; -                _handle_send(tx_ctrl, _policy.timeout); +                const auto timeout = [&]() { +                    std::unique_lock<std::mutex> lock(_mutex); +                    return _policy.timeout; +                }(); +                _handle_send(tx_ctrl, timeout);              } catch (...) {                  UHD_LOG_ERROR("CTRLEP",                      "Encountered an error sending a response for an async message"); @@ -296,11 +303,13 @@ public:      virtual uint16_t get_src_epid() const      { +        // Is const, does not require a mutex          return _my_epid;      }      virtual uint16_t get_port_num() const      { +        // Is const, does not require a mutex          return _local_port;      } @@ -324,7 +333,6 @@ private:          const uhd::time_spec_t& time_spec,          const steady_clock::time_point& timeout_time)      { -        std::unique_lock<std::mutex> lock(_mutex);          if (!_client_clk.is_running()) {              throw uhd::system_error("Ctrlport client clock is not running"); @@ -353,6 +361,8 @@ private:          tx_ctrl.op_code     = op_code;          tx_ctrl.status      = CMD_OKAY; +        std::unique_lock<std::mutex> lock(_mutex); +          // Perform flow control          // If there is no room in the downstream buffer, then wait until the timeout          size_t pyld_size   = get_payload_size(tx_ctrl); @@ -383,10 +393,9 @@ private:      const ctrl_payload wait_for_ack(          const ctrl_payload& request, const steady_clock::time_point& timeout_time)      { -        std::unique_lock<std::mutex> lock(_mutex); -          auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };          while (true) { +            std::unique_lock<std::mutex> lock(_mutex);              // Wait until there is a response in the response queue              if (!resp_ready()) {                  if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) { | 
