aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc/ctrlport_endpoint.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc/ctrlport_endpoint.cpp')
-rw-r--r--host/lib/rfnoc/ctrlport_endpoint.cpp471
1 files changed, 471 insertions, 0 deletions
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp
new file mode 100644
index 000000000..ade5b89ee
--- /dev/null
+++ b/host/lib/rfnoc/ctrlport_endpoint.cpp
@@ -0,0 +1,471 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/chdr/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr/chdr_types.hpp>
+#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
+#include <condition_variable>
+#include <boost/format.hpp>
+#include <mutex>
+#include <queue>
+
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+using namespace uhd::rfnoc::chdr;
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
+
+namespace {
+//! Max async msg (CTRL_WRITE) size in 32-bit words (2 hdr, 2 TS, 1 op-word, 1 data)
+constexpr size_t ASYNC_MESSAGE_SIZE = 6;
+//! Default completion timeout for transactions
+constexpr double DEFAULT_TIMEOUT = 0.1;
+//! Default value for whether ACKs are always required
+constexpr bool DEFAULT_FORCE_ACKS = false;
+}
+
+ctrlport_endpoint::~ctrlport_endpoint() = default;
+
+class ctrlport_endpoint_impl : public ctrlport_endpoint
+{
+public:
+ ctrlport_endpoint_impl(const send_fn_t& send_fcn,
+ sep_id_t my_epid,
+ uint16_t local_port,
+ size_t buff_capacity,
+ size_t max_outstanding_async_msgs,
+ double ctrl_clk_freq,
+ double timebase_freq)
+ : _handle_send(send_fcn)
+ , _my_epid(my_epid)
+ , _local_port(local_port)
+ , _buff_capacity(buff_capacity)
+ , _max_outstanding_async_msgs(max_outstanding_async_msgs)
+ , _ctrl_clk_freq(ctrl_clk_freq)
+ , _timebase_freq(timebase_freq)
+ {
+ }
+
+ virtual ~ctrlport_endpoint_impl() = default;
+
+ virtual void poke32(uint32_t addr,
+ uint32_t data,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
+ bool ack = false)
+ {
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request =
+ send_request_packet(OP_WRITE, addr, {data}, timestamp, timeout_time);
+ // Optionally wait for an ACK
+ if (ack || _policy.force_acks) {
+ wait_for_ack(request, timeout_time);
+ }
+ }
+
+ virtual void multi_poke32(const std::vector<uint32_t> addrs,
+ const std::vector<uint32_t> data,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
+ bool ack = false)
+ {
+ if (addrs.size() != data.size()) {
+ throw uhd::value_error("addrs and data vectors must be of the same length");
+ }
+ for (size_t i = 0; i < data.size(); i++) {
+ poke32(addrs[i],
+ data[i],
+ (i == 0) ? timestamp : uhd::time_spec_t::ASAP,
+ (i == data.size() - 1) ? ack : false);
+ }
+ }
+
+ virtual void block_poke32(uint32_t first_addr,
+ const std::vector<uint32_t> data,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
+ bool ack = false)
+ {
+ for (size_t i = 0; i < data.size(); i++) {
+ poke32(first_addr + (i * sizeof(uint32_t)),
+ data[i],
+ (i == 0) ? timestamp : uhd::time_spec_t::ASAP,
+ (i == data.size() - 1) ? ack : false);
+ }
+
+ /* TODO: Uncomment when the atomic block poke is implemented in the FPGA
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request = send_request_packet(
+ OP_BLOCK_WRITE, first_addr, data, timestamp, timeout_time);
+ // Optionally wait for an ACK
+ if (ack || _policy.force_acks) {
+ wait_for_ack(request, timeout_time);
+ }
+ */
+ }
+
+ virtual uint32_t peek32(
+ uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
+ {
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request =
+ send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp, timeout_time);
+ // Wait for an ACK
+ auto response = wait_for_ack(request, timeout_time);
+ return response.data_vtr[0];
+ }
+
+ virtual std::vector<uint32_t> block_peek32(uint32_t first_addr,
+ size_t length,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
+ {
+ std::vector<uint32_t> values;
+ for (size_t i = 0; i < length; i++) {
+ values.push_back(peek32(first_addr + (i * sizeof(uint32_t)),
+ (i == 0) ? timestamp : uhd::time_spec_t::ASAP));
+ }
+ return values;
+
+ /* TODO: Uncomment when the atomic block peek is implemented in the FPGA
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request = send_request_packet(OP_READ,
+ first_addr,
+ std::vector<uint32_t>(length, 0),
+ timestamp,
+ timeout_time);
+ // Wait for an ACK
+ auto response = wait_for_ack(request, timeout_time);
+ return response.data_vtr;
+ */
+ }
+
+ virtual void poll32(uint32_t addr,
+ uint32_t data,
+ uint32_t mask,
+ uhd::time_spec_t timeout,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
+ bool ack = false)
+ {
+ // TODO: Uncomment when this is implemented in the FPGA
+ throw uhd::not_implemented_error("Control poll not implemented in the FPGA");
+
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request = send_request_packet(OP_POLL,
+ addr,
+ {data, mask, static_cast<uint32_t>(timeout.get_real_secs() * _ctrl_clk_freq)},
+ timestamp,
+ timeout_time);
+ // Optionally wait for an ACK
+ if (ack || _policy.force_acks) {
+ wait_for_ack(request, timeout_time);
+ }
+ }
+
+ virtual void sleep(uhd::time_spec_t duration, bool ack = false)
+ {
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request = send_request_packet(OP_SLEEP,
+ 0,
+ {static_cast<uint32_t>(duration.get_real_secs() * _ctrl_clk_freq)},
+ uhd::time_spec_t::ASAP,
+ timeout_time);
+ // Optionally wait for an ACK
+ if (ack || _policy.force_acks) {
+ wait_for_ack(request, timeout_time);
+ }
+ }
+
+ virtual void register_async_msg_handler(async_msg_callback_t callback_f)
+ {
+ _handle_async_msg = callback_f;
+ }
+
+ virtual void set_policy(const std::string& name, const uhd::device_addr_t& args)
+ {
+ if (name == "default") {
+ _policy.timeout = args.cast<double>("timeout", DEFAULT_TIMEOUT);
+ _policy.force_acks = DEFAULT_FORCE_ACKS;
+ } else {
+ // TODO: Uncomment when custom policies are implemented
+ throw uhd::not_implemented_error("Policy implemented in the FPGA");
+ }
+ }
+
+ virtual void handle_recv(const ctrl_payload& rx_ctrl)
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+ if (rx_ctrl.is_ack) {
+ // Function to process a response with no sequence errors
+ auto process_correct_response = [this, rx_ctrl]() {
+ response_status_t resp_status = RESP_VALID;
+ // Grant flow control credits
+ _buff_occupied -= get_payload_size(_req_queue.front());
+ _buff_free_cond.notify_one();
+ if (get_payload_size(_req_queue.front()) != get_payload_size(rx_ctrl)) {
+ resp_status = RESP_SIZEERR;
+ }
+ // Pop the request from the queue
+ _req_queue.pop();
+ // Push the response into the response queue
+ _resp_queue.push(std::make_tuple(rx_ctrl, resp_status));
+ _resp_ready_cond.notify_one();
+ };
+ // Function to process a response with sequence errors
+ auto process_incorrect_response = [this]() {
+ // Grant flow control credits
+ _buff_occupied -= get_payload_size(_req_queue.front());
+ _buff_free_cond.notify_one();
+ // Push a fabricated response into the response queue
+ ctrl_payload resp(_req_queue.front());
+ resp.is_ack = true;
+ _resp_queue.push(std::make_tuple(resp, RESP_DROPPED));
+ _resp_ready_cond.notify_one();
+ // Pop the request from the queue
+ _req_queue.pop();
+ };
+
+ // Peek at the request queue to check the expected sequence number
+ int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num);
+ if (seq_num_diff == 0) { // No sequence error
+ process_correct_response();
+ } else if (seq_num_diff > 0) { // Packet(s) dropped
+ // Tag all dropped packets
+ for (int8_t i = 0; i < seq_num_diff; i++) {
+ process_incorrect_response();
+ }
+ // Process correct response
+ process_correct_response();
+ } else { // Reordered packet(s)
+ // Requests are processed in order. If seq_num_diff is negative then we
+ // have either already seen this response or we have dropped >128
+ // responses. Either way ignore this packet.
+ }
+ } else {
+ // Handle asynchronous message callback
+ ctrl_status_t status = CMD_CMDERR;
+ if (rx_ctrl.op_code != OP_WRITE) {
+ UHD_LOG_ERROR(
+ "CTRLEP", "Malformed async message request: Invalid opcode");
+ } else if (rx_ctrl.dst_port != _local_port) {
+ UHD_LOG_ERROR("CTRLEP", "Malformed async message request: Invalid port");
+ } else if (rx_ctrl.data_vtr.empty()) {
+ UHD_LOG_ERROR(
+ "CTRLEP", "Malformed async message request: Invalid num_data");
+ } else {
+ try {
+ _handle_async_msg(rx_ctrl.address, rx_ctrl.data_vtr[0]);
+ status = CMD_OKAY;
+ } catch (...) {
+ UHD_LOG_ERROR("CTRLEP", "Async message handler threw an exception");
+ }
+ }
+ try {
+ // Respond with an ACK packet
+ // Flow control not needed because we have allocated special room in the
+ // buffer for async message responses
+ ctrl_payload tx_ctrl(rx_ctrl);
+ tx_ctrl.is_ack = true;
+ tx_ctrl.src_epid = _my_epid;
+ tx_ctrl.status = status;
+ _handle_send(tx_ctrl, _policy.timeout);
+ } catch (...) {
+ UHD_LOG_ERROR("CTRLEP",
+ "Encountered an error sending a response for an async message");
+ }
+ }
+ }
+
+private:
+ //! Returns the length of the control payload in 32-bit words
+ inline static size_t get_payload_size(const ctrl_payload& payload)
+ {
+ return 2 + (payload.timestamp.is_initialized() ? 2 : 0) + payload.data_vtr.size();
+ }
+
+ //! Marks the start of a timeout for an operation and returns the expiration time
+ inline const steady_clock::time_point start_timeout(double duration)
+ {
+ return steady_clock::now() + (static_cast<int>(std::ceil(duration / 1e-6)) * 1us);
+ }
+
+ //! Sends a request control packet to a remote device
+ const ctrl_payload send_request_packet(ctrl_opcode_t op_code,
+ uint32_t address,
+ const std::vector<uint32_t>& data_vtr,
+ const uhd::time_spec_t& time_spec,
+ const steady_clock::time_point& timeout_time)
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+
+ // Convert from uhd::time_spec to timestamp
+ boost::optional<uint64_t> timestamp;
+ if (time_spec != time_spec_t::ASAP) {
+ timestamp = time_spec.to_ticks(_timebase_freq);
+ }
+
+ // Assemble the control payload
+ ctrl_payload tx_ctrl;
+ tx_ctrl.dst_port = _local_port;
+ tx_ctrl.src_port = _local_port;
+ tx_ctrl.seq_num = _tx_seq_num;
+ tx_ctrl.timestamp = timestamp;
+ tx_ctrl.is_ack = false;
+ tx_ctrl.src_epid = _my_epid;
+ tx_ctrl.address = address;
+ tx_ctrl.data_vtr = data_vtr;
+ tx_ctrl.byte_enable = 0xF;
+ tx_ctrl.op_code = op_code;
+ tx_ctrl.status = CMD_OKAY;
+
+ // Perform flow control
+ // If there is no room in the downstream buffer, then wait until the timeout
+ size_t pyld_size = get_payload_size(tx_ctrl);
+ auto buff_not_full = [this, pyld_size]() -> bool {
+ // Allocate room in the queue for one async response packet
+ // If we can fit the current request in the queue then we can proceed
+ return (_buff_occupied + pyld_size)
+ <= (_buff_capacity
+ - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
+ };
+ if (!buff_not_full()) {
+ if (not _buff_free_cond.wait_until(lock, timeout_time, buff_not_full)) {
+ throw uhd::op_timeout(
+ "Control operation timed out waiting for space in command buffer");
+ }
+ }
+ _buff_occupied += pyld_size;
+ _req_queue.push(tx_ctrl);
+
+ // Send the payload as soon as there is room in the buffer
+ _handle_send(tx_ctrl, _policy.timeout);
+ _tx_seq_num = (_tx_seq_num + 1) % 64;
+
+ return tx_ctrl;
+ }
+
+ //! Waits for and returns the ACK for the specified request
+ const ctrl_payload wait_for_ack(
+ const ctrl_payload& request, const steady_clock::time_point& timeout_time)
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+
+ auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };
+ while (true) {
+ // Wait until there is a response in the response queue
+ if (!resp_ready()) {
+ if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) {
+ throw uhd::op_timeout("Control operation timed out waiting for ACK");
+ }
+ }
+ // Extract the response packet
+ ctrl_payload rx_ctrl;
+ response_status_t resp_status;
+ std::tie(rx_ctrl, resp_status) = _resp_queue.front();
+ _resp_queue.pop();
+ // Check if this is the response meant for the request
+ // Filter by op_code, address and seq_num
+ if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code
+ && rx_ctrl.address == request.address) {
+ // Validate transaction status
+ if (rx_ctrl.status == CMD_CMDERR) {
+ throw uhd::op_failed("Control operation returned a failing status");
+ } else if (rx_ctrl.status == CMD_TSERR) {
+ throw uhd::op_timerr("Control operation returned a timestamp error");
+ }
+ // Check data vector size
+ if (rx_ctrl.data_vtr.size() == 0) {
+ throw uhd::op_failed(
+ "Control operation returned a malformed response");
+ }
+ // Validate response status
+ if (resp_status == RESP_DROPPED) {
+ throw uhd::op_seqerr(
+ "Response for a control transaction was dropped");
+ } else if (resp_status == RESP_RTERR) {
+ throw uhd::op_timerr("Control operation encountered a routing error");
+ }
+ return rx_ctrl;
+ } else {
+ // This response does not belong to the request we passed in. Move on.
+ continue;
+ }
+ }
+ }
+
+
+ //! The parameters associated with the policy that governs this object
+ struct policy_args
+ {
+ double timeout = DEFAULT_TIMEOUT;
+ bool force_acks = DEFAULT_FORCE_ACKS;
+ };
+ //! The software status (different from the transaction status) of the response
+ enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
+
+ //! Function to call to send a control packet
+ const send_fn_t _handle_send;
+ //! The endpoint ID of this software endpoint
+ const sep_id_t _my_epid;
+ //! The local port number on the control crossbar for this ctrlport endpoint
+ const uint16_t _local_port;
+ //! The downstream buffer capacity in 32-bit words (used for flow control)
+ const size_t _buff_capacity;
+ //! The max number of outstanding async messages that a block can have at any time
+ const size_t _max_outstanding_async_msgs;
+ //! The clock rate of the clock that drives the ctrlport endpoint
+ const double _ctrl_clk_freq;
+ //! The clock rate of the primary timebase used for timed commands
+ const double _timebase_freq;
+
+ //! The function to call to handle an async message
+ async_msg_callback_t _handle_async_msg = [](uint32_t, uint32_t) {};
+ //! The current control sequence number of outgoing packets
+ uint8_t _tx_seq_num = 0;
+ //! The number of occupied words in the downstream buffer
+ ssize_t _buff_occupied = 0;
+ //! The arguments for the policy that governs this register interface
+ policy_args _policy;
+ //! A condition variable that hold the "downstream buffer is free" condition
+ std::condition_variable _buff_free_cond;
+ //! A queue that holds all outstanding requests
+ std::queue<ctrl_payload> _req_queue;
+ //! A queue that holds all outstanding responses and their status
+ std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
+ //! A condition variable that hold the "response is available" condition
+ std::condition_variable _resp_ready_cond;
+ //! A mutex to protect all state in this class
+ std::mutex _mutex;
+};
+
+ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send,
+ sep_id_t this_epid,
+ uint16_t local_port,
+ size_t buff_capacity,
+ size_t max_outstanding_async_msgs,
+ double ctrl_clk_freq,
+ double timebase_freq)
+{
+ return std::make_shared<ctrlport_endpoint_impl>(handle_send,
+ this_epid,
+ local_port,
+ buff_capacity,
+ max_outstanding_async_msgs,
+ ctrl_clk_freq,
+ timebase_freq);
+}