diff options
-rw-r--r-- | host/lib/rfnoc/ctrlport_endpoint.cpp | 291 |
1 files changed, 170 insertions, 121 deletions
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp index 3a569fae1..cad06f5e2 100644 --- a/host/lib/rfnoc/ctrlport_endpoint.cpp +++ b/host/lib/rfnoc/ctrlport_endpoint.cpp @@ -14,8 +14,7 @@ #include <deque> #include <mutex> #include <numeric> -#include <queue> - +#include <set> using namespace uhd; using namespace uhd::rfnoc; @@ -64,11 +63,8 @@ public: uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, bool ack = false) override { - // Send request - auto request = send_request_packet(OP_WRITE, addr, {data}, timestamp); - // Optionally wait for an ACK - const bool require_ack = ack || _policy.force_acks; - wait_for_ack(request, require_ack); + // Send request and optionally wait for an ACK + send_request_packet(OP_WRITE, addr, {data}, timestamp, ack); } void multi_poke32(const std::vector<uint32_t> addrs, @@ -100,23 +96,19 @@ public: } /* TODO: Uncomment when the atomic block poke is implemented in the FPGA - // Send request - auto request = send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp); - // Optionally wait for an ACK - const bool require_ack = ack || _policy.force_acks; - wait_for_ack(request, require_ack); + // Send request and optionally want for an ACK + send_request_packet(OP_BLOCK_WRITE, first_addr, data, timestamp, ack); */ } uint32_t peek32( uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) override { - // Send request - auto request = send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp); - - // Wait for an ACK - auto response = wait_for_ack(request, true).get(); - return response.data_vtr[0]; + // Send request and wait for an ACK + boost::optional<ctrl_payload> response; + std::tie(std::ignore, response) = + send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp); + return response.get().data_vtr[0]; } std::vector<uint32_t> block_peek32(uint32_t first_addr, @@ -131,15 +123,14 @@ public: return values; /* TODO: Uncomment when the atomic block peek is implemented in the FPGA - // Send request - auto request = send_request_packet(OP_READ, + // Send request and wait for an ACK + boost::optional<ctrl_payload> response; + std::tie(std::ignore, response) = send_request_packet(OP_READ, first_addr, std::vector<uint32_t>(length, 0), timestamp); - // Wait for an ACK - auto response = wait_for_ack(request, true).get(); - return response.data_vtr; + return response.get().data_vtr; */ } @@ -153,30 +144,24 @@ public: // TODO: Uncomment when this is implemented in the FPGA throw uhd::not_implemented_error("Control poll not implemented in the FPGA"); - // Send request - auto request = send_request_packet(OP_POLL, + // Send request and optionally wait for an ACK + send_request_packet(OP_POLL, addr, {data, mask, static_cast<uint32_t>(timeout.to_ticks(_timebase_clk.get_freq()))}, - timestamp); - - // Optionally wait for an ACK - const bool require_ack = ack || _policy.force_acks; - wait_for_ack(request, require_ack); + timestamp, + ack); } void sleep(uhd::time_spec_t duration, bool ack = false) override { - // Send request - auto request = send_request_packet(OP_SLEEP, + // Send request and optionally wait for an ACK + send_request_packet(OP_SLEEP, 0, {static_cast<uint32_t>(duration.to_ticks(_timebase_clk.get_freq()))}, - uhd::time_spec_t::ASAP); - - // Optionally wait for an ACK - const bool require_ack = ack || _policy.force_acks; - wait_for_ack(request, require_ack); + uhd::time_spec_t::ASAP, + ack); } void register_async_msg_validator(async_msg_validator_t callback_f) override @@ -217,20 +202,44 @@ public: } // Pop the request from the queue _req_queue.pop_front(); - // Push the response into the response queue - _resp_queue.push(std::make_tuple(rx_ctrl, resp_status)); - _resp_ready_cond.notify_one(); + // Push the response into the response queue if and only if + // the client wanted an ACK for this packet + wanted_ack_key request_key{ + rx_ctrl.seq_num, rx_ctrl.op_code, rx_ctrl.address}; + if (_wanted_acks.count(request_key)) { + _wanted_acks.erase(request_key); + _resp_queue.push_back(std::make_tuple(rx_ctrl, resp_status)); + _resp_ready_cond.notify_all(); + } else { + // If the client didn't want an ACK for this request, but + // the response indicated an error, then provide some + // feedback + if (rx_ctrl.status != CMD_OKAY) { + log_response_packet_error(rx_ctrl); + } + } }; // Function to process a response with sequence errors auto process_incorrect_response = [this]() { // Grant flow control credits _buff_occupied -= get_payload_size(_req_queue.front()); _buff_free_cond.notify_one(); - // Push a fabricated response into the response queue + // Push a fabricated response into the response queue only + // if the client was waiting on an ACK for the original + // request whose response was lost ctrl_payload resp(_req_queue.front()); - resp.is_ack = true; - _resp_queue.push(std::make_tuple(resp, RESP_DROPPED)); - _resp_ready_cond.notify_one(); + wanted_ack_key request_key{resp.seq_num, resp.op_code, resp.address}; + if (_wanted_acks.count(request_key)) { + _wanted_acks.erase(request_key); + resp.is_ack = true; + _resp_queue.push_back(std::make_tuple(resp, RESP_DROPPED)); + _resp_ready_cond.notify_all(); + } else { + // If the client did not want an ACK for this request, but + // we fabricated a response due to a sequence error, then + // provide feedback + log_dropped_packet(resp); + } // Pop the request from the queue _req_queue.pop_front(); }; @@ -321,6 +330,9 @@ public: } private: + //! The software status (different from the transaction status) of the response + enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR }; + //! Returns the length of the control payload in 32-bit words inline static size_t get_payload_size(const ctrl_payload& payload) { @@ -344,11 +356,14 @@ private: return false; } - //! Sends a request control packet to a remote device - const ctrl_payload send_request_packet(ctrl_opcode_t op_code, + //! Sends a request control packet to a remote device, optionally waiting + // for an ACK, and returns any response if applicable + const std::pair<ctrl_payload, boost::optional<ctrl_payload>> send_request_packet( + ctrl_opcode_t op_code, uint32_t address, const std::vector<uint32_t>& data_vtr, - const uhd::time_spec_t& time_spec) + const uhd::time_spec_t& time_spec, + const bool require_ack = true) { if (!_client_clk.is_running()) { throw uhd::system_error("Ctrlport client clock is not running"); @@ -387,7 +402,7 @@ private: // If we can fit the current request in the queue then we can proceed return (_buff_occupied + pyld_size) <= (_buff_capacity - - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs)); + - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs)); }; if (!buff_not_full()) { // If there is a timed command in the queue, use the @@ -403,90 +418,122 @@ private: _buff_occupied += pyld_size; _req_queue.push_back(tx_ctrl); - // Send the payload as soon as there is room in the buffer - _handle_send(tx_ctrl, _policy.timeout); - _tx_seq_num = (_tx_seq_num + 1) % 64; + if (require_ack || _policy.force_acks) { + // If the client wants an ACK for this request, make note of its + // details in a set. This set will be consulted when responses are + // received. + wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address}; + _wanted_acks.insert(ack_key); + } - return tx_ctrl; + try { + // Send the payload as soon as there is room in the buffer + _handle_send(tx_ctrl, _policy.timeout); + _tx_seq_num = (_tx_seq_num + 1) % 64; + + if (require_ack || _policy.force_acks) { + auto response = wait_for_ack(tx_ctrl, lock); + return {tx_ctrl, response}; + } else { + return {tx_ctrl, {}}; + } + } catch (...) { + // Something went wrong while trying to send the request. + // Remove the entry from the ACK tracking set. + wanted_ack_key ack_key{tx_ctrl.seq_num, tx_ctrl.op_code, tx_ctrl.address}; + _wanted_acks.erase(ack_key); + throw; + } } //! Waits for and returns the ACK for the specified request - // - // \param request The request for which we are awaiting the response - // \param require_ack A Boolean flag which indicates if we really need that - // response. If false, we reduce the timeout to zero - // and return an empty control payload if we can't find - // the relevant ACK. This can be used to help clear the - // response queue without waiting. - // \returns The response payload corresponding to \p requst. If \p require_ack - // is false, this may also be an empty ctrl_payload object with no - // meaningful content. - // \throws uhd::op_timeout if require_ack is true, and we exceed the timeout - // set by the current policy. Throws various other uhd::rfnoc_error - // when there was a communication issue (see the code for details). - const boost::optional<ctrl_payload> wait_for_ack( - const ctrl_payload& request, const bool require_ack) + const ctrl_payload wait_for_ack( + const ctrl_payload& request, std::unique_lock<std::mutex>& lock) { - auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); }; - while (true) { - std::unique_lock<std::mutex> lock(_mutex); - // Wait until there is a response in the response queue - if (!resp_ready()) { - if (!require_ack) { - return {}; - } - // If we're waiting for a timed command or if we have a - // command in the queue, use the MASSIVE_TIMEOUT instead - auto timeout_time = start_timeout( - check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout); - - if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) { - throw uhd::op_timeout("Control operation timed out waiting for ACK"); - } - } + // Determine the timepoint (now plus some offset, depending on + // whether we're waiting for a timed command or if we have a + // command in the queue) at which point we flag a timeout. + const auto timeout_time = + start_timeout(check_timed_in_queue() ? MASSIVE_TIMEOUT : _policy.timeout); + + // Check the queue for the response for the specific request, looping + // until it's found or the timeout timepoint has been reached. + do { // Extract the response packet ctrl_payload rx_ctrl; response_status_t resp_status; - std::tie(rx_ctrl, resp_status) = _resp_queue.front(); - _resp_queue.pop(); - // Check if this is the response meant for the request - // Filter by op_code, address and seq_num - if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code - && rx_ctrl.address == request.address) { - // If we find the response for our request, and the client - // has not requested acknowledgements, then simply return, - // ignoring any error status in the response. This restores - // the pre-0caed5529 behavior. - if (!require_ack) { - return {}; - } - // Validate transaction status - if (rx_ctrl.status == CMD_CMDERR) { - throw uhd::op_failed("Control operation returned a failing status"); - } else if (rx_ctrl.status == CMD_TSERR) { - throw uhd::op_timerr("Control operation returned a timestamp error"); - } - // Check data vector size - if (rx_ctrl.data_vtr.empty()) { - throw uhd::op_failed( - "Control operation returned a malformed response"); + // Iterate the response queue, looking for the response that is + // meant for the request. When found, it is removed from the queue. + // Other elements are left undisturbed. + for (auto q_iterator = _resp_queue.begin(); q_iterator != _resp_queue.end(); + q_iterator++) { + std::tie(rx_ctrl, resp_status) = *q_iterator; + // Check if this is the response meant for the request + // Filter by op_code, address and seq_num + if (rx_ctrl.seq_num == request.seq_num + && rx_ctrl.op_code == request.op_code + && rx_ctrl.address == request.address) { + // Remove the response from the queue + _resp_queue.erase(q_iterator); + + // Validate transaction status, either returning the + // response if everything checks out, or throwing an + // exception if the status indicates an error + return validate_ack(rx_ctrl, resp_status); } - // Validate response status - if (resp_status == RESP_DROPPED) { - throw uhd::op_seqerr( - "Response for a control transaction was dropped"); - } else if (resp_status == RESP_RTERR) { - throw uhd::op_timerr("Control operation encountered a routing error"); - } - return rx_ctrl; - } else { - // This response does not belong to the request we passed in. Move on. - continue; } + // If we got here, that means we iterated the queue and did NOT + // find a matching response to the request that this thread sent. + // We can't return, because if we're in this function, the caller + // specifically requested to receive an ACK for their request, + // and we haven't received it yet. So we must wait until a new + // request is added to the queue and try again. If we reach the + // timeout timepoint before a new request is added, then throw the + // timeout exception. + } while ( + _resp_ready_cond.wait_until(lock, timeout_time) != std::cv_status::timeout); + + throw uhd::op_timeout("Control operation timed out waiting for ACK"); + } + + const ctrl_payload validate_ack( + const ctrl_payload& rx_ctrl, response_status_t resp_status) const + { + if (rx_ctrl.status == CMD_CMDERR) { + throw uhd::op_failed("Control operation returned a failing status"); + } else if (rx_ctrl.status == CMD_TSERR) { + throw uhd::op_timerr("Control operation returned a timestamp error"); } + // Check data vector size + if (rx_ctrl.data_vtr.empty()) { + throw uhd::op_failed("Control operation returned a malformed response"); + } + // Validate response status + if (resp_status == RESP_DROPPED) { + throw uhd::op_seqerr("Response for a control transaction was dropped"); + } else if (resp_status == RESP_RTERR) { + throw uhd::op_timerr("Control operation encountered a routing error"); + } + return rx_ctrl; } + void log_response_packet_error(const ctrl_payload& resp) + { + std::string packet = resp.to_string(); + packet.pop_back(); // Remove the trailing \n + UHD_LOG_DEBUG("CTRLEP", + "Control response for ack-less request returned a failing status: " + << packet); + } + + void log_dropped_packet(const ctrl_payload& resp) + { + std::string packet = resp.to_string(); + packet.pop_back(); // Remove the trailing \n + UHD_LOG_DEBUG( + "CTRLEP", "Control response for ack-less request was dropped: " << packet); + } //! The parameters associated with the policy that governs this object struct policy_args @@ -494,8 +541,6 @@ private: double timeout = DEFAULT_TIMEOUT; bool force_acks = DEFAULT_FORCE_ACKS; }; - //! The software status (different from the transaction status) of the response - enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR }; //! Function to call to send a control packet const send_fn_t _handle_send; @@ -529,11 +574,15 @@ private: //! A queue that holds all outstanding requests std::deque<ctrl_payload> _req_queue; //! A queue that holds all outstanding responses and their status - std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue; + std::deque<std::tuple<ctrl_payload, response_status_t>> _resp_queue; //! A condition variable that hold the "response is available" condition std::condition_variable _resp_ready_cond; //! A mutex to protect all state in this class std::mutex _mutex; + //! A set of {opcode, address, sequence numbers} triples associated with + // request packets for which the client cares about receiving ACKs + using wanted_ack_key = std::tuple<uint8_t, ctrl_opcode_t, uint32_t>; + std::set<wanted_ack_key> _wanted_acks; }; ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send, |