diff options
| -rw-r--r-- | host/lib/rfnoc/ctrlport_endpoint.cpp | 291 | 
1 files changed, 170 insertions, 121 deletions
| diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp index 3a569fae1..cad06f5e2 100644 --- a/host/lib/rfnoc/ctrlport_endpoint.cpp +++ b/host/lib/rfnoc/ctrlport_endpoint.cpp @@ -14,8 +14,7 @@  #include <deque>  #include <mutex>  #include <numeric> -#include <queue> - +#include <set>  using namespace uhd;  using namespace uhd::rfnoc; @@ -64,11 +63,8 @@ public:          uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,          bool ack                   = false) override      { -        // Send request -        auto request = send_request_packet(OP_WRITE, addr, {data}, timestamp); -        // Optionally wait for an ACK -        const bool require_ack = ack || _policy.force_acks; -        wait_for_ack(request, require_ack); +        // Send request and optionally wait for an ACK +        send_request_packet(OP_WRITE, addr, {data}, timestamp, ack);      }      void multi_poke32(const std::vector<uint32_t> addrs, @@ -100,23 +96,19 @@ public:          }          /* TODO: Uncomment when the atomic block poke is implemented in the FPGA -        // Send request -        auto request = send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp); -        // Optionally wait for an ACK -        const bool require_ack = ack || _policy.force_acks; -        wait_for_ack(request, require_ack); +        // Send request and optionally want for an ACK +        send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp, ack);          */      }      uint32_t peek32(          uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) override      { -        // Send request -        auto request = send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp); - -        // Wait for an ACK -        auto response = wait_for_ack(request, true).get(); -        return response.data_vtr[0]; +        // Send request and wait for an ACK +        boost::optional<ctrl_payload> response; +        std::tie(std::ignore, response) = +            send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp); +        return response.get().data_vtr[0];      }      std::vector<uint32_t> block_peek32(uint32_t first_addr, @@ -131,15 +123,14 @@ public:          return values;          /* TODO: Uncomment when the atomic block peek is implemented in the FPGA -        // Send request -        auto request = send_request_packet(OP_READ, +        // Send request and wait for an ACK +        boost::optional<ctrl_payload> response; +        std::tie(std::ignore, response) = send_request_packet(OP_READ,              first_addr,              std::vector<uint32_t>(length, 0),              timestamp); -        // Wait for an ACK -        auto response = wait_for_ack(request, true).get(); -        return response.data_vtr; +        return response.get().data_vtr;          */      } @@ -153,30 +144,24 @@ public:          // TODO: Uncomment when this is implemented in the FPGA          throw uhd::not_implemented_error("Control poll not implemented in the FPGA"); -        // Send request -        auto request = send_request_packet(OP_POLL, +        // Send request and optionally wait for an ACK +        send_request_packet(OP_POLL,              addr,              {data,                  mask,                  static_cast<uint32_t>(timeout.to_ticks(_timebase_clk.get_freq()))}, -            timestamp); - -        // Optionally wait for an ACK -        const bool require_ack = ack || _policy.force_acks; -        wait_for_ack(request, require_ack); +            timestamp, +            ack);      }      void sleep(uhd::time_spec_t duration, bool ack = false) override      { -        // Send request -        auto request = send_request_packet(OP_SLEEP, +        // Send request and optionally wait for an ACK +        send_request_packet(OP_SLEEP,              0,              {static_cast<uint32_t>(duration.to_ticks(_timebase_clk.get_freq()))}, -            uhd::time_spec_t::ASAP); - -        // Optionally wait for an ACK -        const bool require_ack = ack || _policy.force_acks; -        wait_for_ack(request, require_ack); +            uhd::time_spec_t::ASAP, +            ack);      }      void register_async_msg_validator(async_msg_validator_t callback_f) override @@ -217,20 +202,44 @@ public:                  }                  // Pop the request from the queue                  _req_queue.pop_front(); -                // Push the response into the response queue -                _resp_queue.push(std::make_tuple(rx_ctrl, resp_status)); -                _resp_ready_cond.notify_one(); +                // Push the response into the response queue if and only if +                // the client wanted an ACK for this packet +                wanted_ack_key request_key{ +                    rx_ctrl.seq_num, rx_ctrl.op_code, rx_ctrl.address}; +                if (_wanted_acks.count(request_key)) { +                    _wanted_acks.erase(request_key); +                    _resp_queue.push_back(std::make_tuple(rx_ctrl, resp_status)); +                    _resp_ready_cond.notify_all(); +                } else { +                    // If the client didn't want an ACK for this request, but +                    // the response indicated an error, then provide some +                    // feedback +                    if (rx_ctrl.status != CMD_OKAY) { +                        log_response_packet_error(rx_ctrl); +                    } +                }              };              // Function to process a response with sequence errors              auto process_incorrect_response = [this]() {                  // Grant flow control credits                  _buff_occupied -= get_payload_size(_req_queue.front());                  _buff_free_cond.notify_one(); -                // Push a fabricated response into the response queue +                // Push a fabricated response into the response queue only +                // if the client was waiting on an ACK for the original +                // request whose response was lost                  ctrl_payload resp(_req_queue.front()); -                resp.is_ack = true; -                _resp_queue.push(std::make_tuple(resp, RESP_DROPPED)); -                _resp_ready_cond.notify_one(); +                wanted_ack_key request_key{resp.seq_num, resp.op_code, resp.address}; +                if (_wanted_acks.count(request_key)) { +                    _wanted_acks.erase(request_key); +                    resp.is_ack = true; +                    _resp_queue.push_back(std::make_tuple(resp, RESP_DROPPED)); +                    _resp_ready_cond.notify_all(); +                } else { +                    // If the client did not want an ACK for this request, but +                    // we fabricated a response due to a sequence error, then +                    // provide feedback +                    log_dropped_packet(resp); +                }                  // Pop the request from the queue                  _req_queue.pop_front();              }; @@ -321,6 +330,9 @@ public:      }  private: +    //! The software status (different from the transaction status) of the response +    enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR }; +      //! Returns the length of the control payload in 32-bit words      inline static size_t get_payload_size(const ctrl_payload& payload)      { @@ -344,11 +356,14 @@ private:          return false;      } -    //! Sends a request control packet to a remote device -    const ctrl_payload send_request_packet(ctrl_opcode_t op_code, +    //! Sends a request control packet to a remote device, optionally waiting +    // for an ACK, and returns any response if applicable +    const std::pair<ctrl_payload, boost::optional<ctrl_payload>> send_request_packet( +        ctrl_opcode_t op_code,          uint32_t address,          const std::vector<uint32_t>& data_vtr, -        const uhd::time_spec_t& time_spec) +        const uhd::time_spec_t& time_spec, +        const bool require_ack = true)      {          if (!_client_clk.is_running()) {              throw uhd::system_error("Ctrlport client clock is not running"); @@ -387,7 +402,7 @@ private:              // If we can fit the current request in the queue then we can proceed              return (_buff_occupied + pyld_size)                     <= (_buff_capacity -                          - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs)); +                       - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));          };          if (!buff_not_full()) {              // If there is a timed command in the queue, use the @@ -403,90 +418,122 @@ private:          _buff_occupied += pyld_size;          _req_queue.push_back(tx_ctrl); -        // Send the payload as soon as there is room in the buffer -        _handle_send(tx_ctrl, _policy.timeout); -        _tx_seq_num = (_tx_seq_num + 1) % 64; +        if (require_ack || _policy.force_acks) { +            // If the client wants an ACK for this request, make note of its +            // details in a set. This set will be consulted when responses are +            // received. +            wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address}; +            _wanted_acks.insert(ack_key); +        } -        return tx_ctrl; +        try { +            // Send the payload as soon as there is room in the buffer +            _handle_send(tx_ctrl, _policy.timeout); +            _tx_seq_num = (_tx_seq_num + 1) % 64; + +            if (require_ack || _policy.force_acks) { +                auto response = wait_for_ack(tx_ctrl, lock); +                return {tx_ctrl, response}; +            } else { +                return {tx_ctrl, {}}; +            } +        } catch (...) { +            // Something went wrong while trying to send the request. +            // Remove the entry from the ACK tracking set. +            wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address}; +            _wanted_acks.erase(ack_key); +            throw; +        }      }      //! Waits for and returns the ACK for the specified request -    // -    // \param request The request for which we are awaiting the response -    // \param require_ack A Boolean flag which indicates if we really need that -    //                    response. If false, we reduce the timeout to zero -    //                    and return an empty control payload if we can't find -    //                    the relevant ACK. This can be used to help clear the -    //                    response queue without waiting. -    // \returns The response payload corresponding to \p requst. If \p require_ack -    //          is false, this may also be an empty ctrl_payload object with no -    //          meaningful content. -    // \throws uhd::op_timeout if require_ack is true, and we exceed the timeout -    //         set by the current policy. Throws various other uhd::rfnoc_error -    //         when there was a communication issue (see the code for details). -    const boost::optional<ctrl_payload> wait_for_ack( -        const ctrl_payload& request, const bool require_ack) +    const ctrl_payload wait_for_ack( +        const ctrl_payload& request, std::unique_lock<std::mutex>& lock)      { -        auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); }; -        while (true) { -            std::unique_lock<std::mutex> lock(_mutex); -            // Wait until there is a response in the response queue -            if (!resp_ready()) { -                if (!require_ack) { -                    return {}; -                } -                // If we're waiting for a timed command or if we have a -                // command in the queue, use the MASSIVE_TIMEOUT instead -                auto timeout_time = start_timeout( -                    check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout); - -                if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) { -                    throw uhd::op_timeout("Control operation timed out waiting for ACK"); -                } -            } +        // Determine the timepoint (now plus some offset, depending on +        // whether we're waiting for a timed command or if we have a +        // command in the queue) at which point we flag a timeout. +        const auto timeout_time = +            start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout); + +        // Check the queue for the response for the specific request, looping +        // until it's found or the timeout timepoint has been reached. +        do {              // Extract the response packet              ctrl_payload rx_ctrl;              response_status_t resp_status; -            std::tie(rx_ctrl, resp_status) = _resp_queue.front(); -            _resp_queue.pop(); -            // Check if this is the response meant for the request -            // Filter by op_code, address and seq_num -            if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code -                && rx_ctrl.address == request.address) { -                // If we find the response for our request, and the client -                // has not requested acknowledgements, then simply return, -                // ignoring any error status in the response. This restores -                // the pre-0caed5529 behavior. -                if (!require_ack) { -                    return {}; -                } -                // Validate transaction status -                if (rx_ctrl.status == CMD_CMDERR) { -                    throw uhd::op_failed("Control operation returned a failing status"); -                } else if (rx_ctrl.status == CMD_TSERR) { -                    throw uhd::op_timerr("Control operation returned a timestamp error"); -                } -                // Check data vector size -                if (rx_ctrl.data_vtr.empty()) { -                    throw uhd::op_failed( -                        "Control operation returned a malformed response"); +            // Iterate the response queue, looking for the response that is +            // meant for the request. When found, it is removed from the queue. +            // Other elements are left undisturbed. +            for (auto q_iterator = _resp_queue.begin(); q_iterator != _resp_queue.end(); +                 q_iterator++) { +                std::tie(rx_ctrl, resp_status) = *q_iterator; +                // Check if this is the response meant for the request +                // Filter by op_code, address and seq_num +                if (rx_ctrl.seq_num == request.seq_num +                    && rx_ctrl.op_code == request.op_code +                    && rx_ctrl.address == request.address) { +                    // Remove the response from the queue +                    _resp_queue.erase(q_iterator); + +                    // Validate transaction status, either returning the +                    // response if everything checks out, or throwing an +                    // exception if the status indicates an error +                    return validate_ack(rx_ctrl, resp_status);                  } -                // Validate response status -                if (resp_status == RESP_DROPPED) { -                    throw uhd::op_seqerr( -                        "Response for a control transaction was dropped"); -                } else if (resp_status == RESP_RTERR) { -                    throw uhd::op_timerr("Control operation encountered a routing error"); -                } -                return rx_ctrl; -            } else { -                // This response does not belong to the request we passed in. Move on. -                continue;              } +            // If we got here, that means we iterated the queue and did NOT +            // find a matching response to the request that this thread sent. +            // We can't return, because if we're in this function, the caller +            // specifically requested to receive an ACK for their request, +            // and we haven't received it yet. So we must wait until a new +            // request is added to the queue and try again. If we reach the +            // timeout timepoint before a new request is added, then throw the +            // timeout exception. +        } while ( +            _resp_ready_cond.wait_until(lock, timeout_time) != std::cv_status::timeout); + +        throw uhd::op_timeout("Control operation timed out waiting for ACK"); +    } + +    const ctrl_payload validate_ack( +        const ctrl_payload& rx_ctrl, response_status_t resp_status) const +    { +        if (rx_ctrl.status == CMD_CMDERR) { +            throw uhd::op_failed("Control operation returned a failing status"); +        } else if (rx_ctrl.status == CMD_TSERR) { +            throw uhd::op_timerr("Control operation returned a timestamp error");          } +        // Check data vector size +        if (rx_ctrl.data_vtr.empty()) { +            throw uhd::op_failed("Control operation returned a malformed response"); +        } +        // Validate response status +        if (resp_status == RESP_DROPPED) { +            throw uhd::op_seqerr("Response for a control transaction was dropped"); +        } else if (resp_status == RESP_RTERR) { +            throw uhd::op_timerr("Control operation encountered a routing error"); +        } +        return rx_ctrl;      } +    void log_response_packet_error(const ctrl_payload& resp) +    { +        std::string packet = resp.to_string(); +        packet.pop_back(); // Remove the trailing \n +        UHD_LOG_DEBUG("CTRLEP", +            "Control response for ack-less request returned a failing status: " +                << packet); +    } + +    void log_dropped_packet(const ctrl_payload& resp) +    { +        std::string packet = resp.to_string(); +        packet.pop_back(); // Remove the trailing \n +        UHD_LOG_DEBUG( +            "CTRLEP", "Control response for ack-less request was dropped: " << packet); +    }      //! The parameters associated with the policy that governs this object      struct policy_args @@ -494,8 +541,6 @@ private:          double timeout  = DEFAULT_TIMEOUT;          bool force_acks = DEFAULT_FORCE_ACKS;      }; -    //! The software status (different from the transaction status) of the response -    enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };      //! Function to call to send a control packet      const send_fn_t _handle_send; @@ -529,11 +574,15 @@ private:      //! A queue that holds all outstanding requests      std::deque<ctrl_payload> _req_queue;      //! A queue that holds all outstanding responses and their status -    std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue; +    std::deque<std::tuple<ctrl_payload, response_status_t>> _resp_queue;      //! A condition variable that hold the "response is available" condition      std::condition_variable _resp_ready_cond;      //! A mutex to protect all state in this class      std::mutex _mutex; +    //! A set of {opcode, address, sequence numbers} triples associated with +    // request packets for which the client cares about receiving ACKs +    using wanted_ack_key = std::tuple<uint8_t, ctrl_opcode_t, uint32_t>; +    std::set<wanted_ack_key> _wanted_acks;  };  ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send, | 
