aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/ctrlport_endpoint.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc/ctrlport_endpoint.cpp')
-rw-r--r--host/lib/rfnoc/ctrlport_endpoint.cpp19
1 files changed, 14 insertions, 5 deletions
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp
index a618eaca2..267ff5d71 100644
--- a/host/lib/rfnoc/ctrlport_endpoint.cpp
+++ b/host/lib/rfnoc/ctrlport_endpoint.cpp
@@ -193,11 +193,13 @@ public:
virtual void register_async_msg_handler(async_msg_callback_t callback_f)
{
+ std::unique_lock<std::mutex> lock(_mutex);
_handle_async_msg = callback_f;
}
virtual void set_policy(const std::string& name, const uhd::device_addr_t& args)
{
+ std::unique_lock<std::mutex> lock(_mutex);
if (name == "default") {
_policy.timeout = args.cast<double>("timeout", DEFAULT_TIMEOUT);
_policy.force_acks = DEFAULT_FORCE_ACKS;
@@ -209,10 +211,10 @@ public:
virtual void handle_recv(const ctrl_payload& rx_ctrl)
{
- std::unique_lock<std::mutex> lock(_mutex);
if (rx_ctrl.is_ack) {
// Function to process a response with no sequence errors
auto process_correct_response = [this, rx_ctrl]() {
+ std::unique_lock<std::mutex> lock(_mutex);
response_status_t resp_status = RESP_VALID;
// Grant flow control credits
_buff_occupied -= get_payload_size(_req_queue.front());
@@ -228,6 +230,7 @@ public:
};
// Function to process a response with sequence errors
auto process_incorrect_response = [this]() {
+ std::unique_lock<std::mutex> lock(_mutex);
// Grant flow control credits
_buff_occupied -= get_payload_size(_req_queue.front());
_buff_free_cond.notify_one();
@@ -286,7 +289,11 @@ public:
tx_ctrl.is_ack = true;
tx_ctrl.src_epid = _my_epid;
tx_ctrl.status = status;
- _handle_send(tx_ctrl, _policy.timeout);
+ const auto timeout = [&]() {
+ std::unique_lock<std::mutex> lock(_mutex);
+ return _policy.timeout;
+ }();
+ _handle_send(tx_ctrl, timeout);
} catch (...) {
UHD_LOG_ERROR("CTRLEP",
"Encountered an error sending a response for an async message");
@@ -296,11 +303,13 @@ public:
virtual uint16_t get_src_epid() const
{
+ // Is const, does not require a mutex
return _my_epid;
}
virtual uint16_t get_port_num() const
{
+ // Is const, does not require a mutex
return _local_port;
}
@@ -324,7 +333,6 @@ private:
const uhd::time_spec_t& time_spec,
const steady_clock::time_point& timeout_time)
{
- std::unique_lock<std::mutex> lock(_mutex);
if (!_client_clk.is_running()) {
throw uhd::system_error("Ctrlport client clock is not running");
@@ -353,6 +361,8 @@ private:
tx_ctrl.op_code = op_code;
tx_ctrl.status = CMD_OKAY;
+ std::unique_lock<std::mutex> lock(_mutex);
+
// Perform flow control
// If there is no room in the downstream buffer, then wait until the timeout
size_t pyld_size = get_payload_size(tx_ctrl);
@@ -383,10 +393,9 @@ private:
const ctrl_payload wait_for_ack(
const ctrl_payload& request, const steady_clock::time_point& timeout_time)
{
- std::unique_lock<std::mutex> lock(_mutex);
-
auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };
while (true) {
+ std::unique_lock<std::mutex> lock(_mutex);
// Wait until there is a response in the response queue
if (!resp_ready()) {
if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) {