aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/rfnoc
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/rfnoc')
-rw-r--r--host/lib/rfnoc/CMakeLists.txt2
-rw-r--r--host/lib/rfnoc/chdr/chdr_types.cpp3
-rw-r--r--host/lib/rfnoc/chdr_ctrl_endpoint.cpp180
-rw-r--r--host/lib/rfnoc/ctrlport_endpoint.cpp471
4 files changed, 655 insertions, 1 deletions
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt
index 6aab0d499..0d989b83b 100644
--- a/host/lib/rfnoc/CMakeLists.txt
+++ b/host/lib/rfnoc/CMakeLists.txt
@@ -27,6 +27,8 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/node_ctrl_base.cpp
${CMAKE_CURRENT_SOURCE_DIR}/node.cpp
${CMAKE_CURRENT_SOURCE_DIR}/rate_node_ctrl.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/ctrlport_endpoint.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/chdr_ctrl_endpoint.cpp
${CMAKE_CURRENT_SOURCE_DIR}/rx_stream_terminator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/scalar_node_ctrl.cpp
${CMAKE_CURRENT_SOURCE_DIR}/sink_block_ctrl_base.cpp
diff --git a/host/lib/rfnoc/chdr/chdr_types.cpp b/host/lib/rfnoc/chdr/chdr_types.cpp
index 8786b8193..8920e4fe3 100644
--- a/host/lib/rfnoc/chdr/chdr_types.cpp
+++ b/host/lib/rfnoc/chdr/chdr_types.cpp
@@ -57,7 +57,8 @@ size_t ctrl_payload::serialize(uint64_t* buff,
| ((static_cast<uint64_t>(data_vtr.size()) & mask_u64(NUM_DATA_WIDTH))
<< NUM_DATA_OFFSET)
| ((static_cast<uint64_t>(seq_num) & mask_u64(SEQ_NUM_WIDTH)) << SEQ_NUM_OFFSET)
- | ((static_cast<uint64_t>(timestamp ? 1 : 0) & mask_u64(HAS_TIME_WIDTH))
+ | ((static_cast<uint64_t>(timestamp.is_initialized() ? 1 : 0)
+ & mask_u64(HAS_TIME_WIDTH))
<< HAS_TIME_OFFSET)
| ((static_cast<uint64_t>(is_ack) & mask_u64(IS_ACK_WIDTH)) << IS_ACK_OFFSET)
| ((static_cast<uint64_t>(src_epid) & mask_u64(SRC_EPID_WIDTH))
diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
new file mode 100644
index 000000000..cab90fb49
--- /dev/null
+++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp
@@ -0,0 +1,180 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/safe_call.hpp>
+#include <uhd/utils/thread.hpp>
+#include <uhdlib/rfnoc/chdr/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr/chdr_types.hpp>
+#include <uhdlib/rfnoc/chdr_ctrl_endpoint.hpp>
+#include <boost/format.hpp>
+#include <mutex>
+#include <thread>
+#include <atomic>
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+using namespace uhd::rfnoc::chdr;
+
+using namespace std::placeholders;
+
+chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default;
+
+class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint
+{
+public:
+ chdr_ctrl_endpoint_impl(const both_xports_t& xports,
+ const chdr::chdr_packet_factory& pkt_factory,
+ sep_id_t dst_epid,
+ sep_id_t my_epid)
+ : _dst_epid(dst_epid)
+ , _my_epid(my_epid)
+ , _xports(xports)
+ , _send_seqnum(0)
+ , _send_pkt(pkt_factory.make_ctrl())
+ , _recv_pkt(pkt_factory.make_ctrl())
+ , _stop_recv_thread(false)
+ , _recv_thread([this]() { recv_worker(); })
+ {
+ const std::string thread_name(str(boost::format("uhd_ctrl_ep%04x") % _my_epid));
+ uhd::set_thread_name(&_recv_thread, thread_name);
+ UHD_LOG_DEBUG("RFNOC",
+ boost::format("Started thread %s to process messages for CHDR Ctrl EP for "
+ "EPID %d -> EPID %d")
+ % thread_name % _my_epid % _dst_epid);
+ }
+
+ virtual ~chdr_ctrl_endpoint_impl()
+ {
+ UHD_SAFE_CALL(
+ // Interrupt buffer updater loop
+ _stop_recv_thread = true;
+ // Wait for loop to finish
+ // No timeout on join. The recv loop is guaranteed
+ // to terminate in a reasonable amount of time because
+ // there are no timed blocks on the underlying.
+ _recv_thread.join();
+ // Flush base transport
+ while (_xports.recv->get_recv_buff(0.0001)) /*NOP*/;
+ // Release child endpoints
+ _endpoint_map.clear(););
+ }
+
+ virtual ctrlport_endpoint::sptr get_ctrlport_ep(uint16_t port,
+ size_t buff_capacity,
+ size_t max_outstanding_async_msgs,
+ double ctrl_clk_freq,
+ double timebase_freq)
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ // Function to send a control payload
+ auto send_fn = [this](const ctrl_payload& payload, double timeout) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ // Build header
+ chdr_header header;
+ header.set_pkt_type(PKT_TYPE_CTRL);
+ header.set_num_mdata(0);
+ header.set_seq_num(_send_seqnum++);
+ header.set_dst_epid(_dst_epid);
+ // Acquire send buffer and send the packet
+ auto send_buff = _xports.send->get_send_buff(timeout);
+ _send_pkt->refresh(send_buff->cast<void*>(), header, payload);
+ send_buff->commit(header.get_length());
+ };
+
+ if (_endpoint_map.find(port) == _endpoint_map.end()) {
+ ctrlport_endpoint::sptr ctrlport_ep = ctrlport_endpoint::make(send_fn,
+ _my_epid,
+ port,
+ buff_capacity,
+ max_outstanding_async_msgs,
+ ctrl_clk_freq,
+ timebase_freq);
+ _endpoint_map.insert(std::make_pair(port, ctrlport_ep));
+ UHD_LOG_DEBUG("RFNOC",
+ boost::format("Created ctrlport endpoint for port %d on EPID %d") % port
+ % _my_epid);
+ return ctrlport_ep;
+ } else {
+ return _endpoint_map.at(port);
+ }
+ }
+
+ virtual size_t get_num_drops() const
+ {
+ return _num_drops;
+ }
+
+private:
+ void recv_worker()
+ {
+ // Run forever:
+ // - Pull packets from the base transport
+ // - Route them based on the dst_port
+ // - Pass them to the ctrlport_endpoint for additional processing
+ while (not _stop_recv_thread) {
+ auto buff = _xports.recv->get_recv_buff(0.0);
+ if (buff) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ try {
+ _recv_pkt->refresh(buff->cast<void*>());
+ const ctrl_payload payload = _recv_pkt->get_payload();
+ if (_endpoint_map.find(payload.dst_port) != _endpoint_map.end()) {
+ _endpoint_map.at(payload.dst_port)->handle_recv(payload);
+ }
+ } catch (...) {
+ // Ignore all errors
+ }
+ } else {
+ // Be a good citizen and yield if no packet is processed
+ static const size_t MIN_DUR = 1;
+ boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
+ // We call sleep(MIN_DUR) above instead of yield() to ensure that we
+ // relinquish the current scheduler time slot.
+ // yield() is a hint to the scheduler to end the time
+ // slice early and schedule in another thread that is ready to run.
+ // However in most situations, there will be no other thread and
+ // this thread will continue to run which will rail a CPU core.
+ // We call sleep(MIN_DUR=1) instead which will sleep for a minimum
+ // time. Ideally we would like to use boost::chrono::.*seconds::min()
+ // but that is bound to 0, which causes the sleep_for call to be a
+ // no-op and thus useless to actually force a sleep.
+ }
+ }
+ }
+
+ // The endpoint ID of the destination
+ const sep_id_t _dst_epid;
+ // The endpoint ID of this software endpoint
+ const sep_id_t _my_epid;
+ // Send/recv transports
+ const uhd::both_xports_t _xports;
+ // The curent sequence number for a send packet
+ size_t _send_seqnum = 0;
+ // The number of packets dropped
+ size_t _num_drops = 0;
+ // Packet containers
+ chdr_ctrl_packet::uptr _send_pkt;
+ chdr_ctrl_packet::cuptr _recv_pkt;
+ // A collection of ctrlport endpoints (keyed by the port number)
+ std::map<uint16_t, ctrlport_endpoint::sptr> _endpoint_map;
+ // A thread that will handle all responses and async message requests
+ std::atomic_bool _stop_recv_thread;
+ std::thread _recv_thread;
+ // Mutex that protects all state in this class
+ std::mutex _mutex;
+};
+
+chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const both_xports_t& xports,
+ const chdr::chdr_packet_factory& pkt_factory,
+ sep_id_t dst_epid,
+ sep_id_t my_epid)
+{
+ return std::make_unique<chdr_ctrl_endpoint_impl>(
+ xports, pkt_factory, dst_epid, my_epid);
+}
diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp
new file mode 100644
index 000000000..ade5b89ee
--- /dev/null
+++ b/host/lib/rfnoc/ctrlport_endpoint.cpp
@@ -0,0 +1,471 @@
+//
+// Copyright 2019 Ettus Research, a National Instruments Brand
+//
+// SPDX-License-Identifier: GPL-3.0-or-later
+//
+
+#include <uhd/exception.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhdlib/rfnoc/chdr/chdr_packet.hpp>
+#include <uhdlib/rfnoc/chdr/chdr_types.hpp>
+#include <uhdlib/rfnoc/ctrlport_endpoint.hpp>
+#include <condition_variable>
+#include <boost/format.hpp>
+#include <mutex>
+#include <queue>
+
+
+using namespace uhd;
+using namespace uhd::rfnoc;
+using namespace uhd::rfnoc::chdr;
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
+
+namespace {
+//! Max async msg (CTRL_WRITE) size in 32-bit words (2 hdr, 2 TS, 1 op-word, 1 data)
+constexpr size_t ASYNC_MESSAGE_SIZE = 6;
+//! Default completion timeout for transactions
+constexpr double DEFAULT_TIMEOUT = 0.1;
+//! Default value for whether ACKs are always required
+constexpr bool DEFAULT_FORCE_ACKS = false;
+}
+
+ctrlport_endpoint::~ctrlport_endpoint() = default;
+
+class ctrlport_endpoint_impl : public ctrlport_endpoint
+{
+public:
+ ctrlport_endpoint_impl(const send_fn_t& send_fcn,
+ sep_id_t my_epid,
+ uint16_t local_port,
+ size_t buff_capacity,
+ size_t max_outstanding_async_msgs,
+ double ctrl_clk_freq,
+ double timebase_freq)
+ : _handle_send(send_fcn)
+ , _my_epid(my_epid)
+ , _local_port(local_port)
+ , _buff_capacity(buff_capacity)
+ , _max_outstanding_async_msgs(max_outstanding_async_msgs)
+ , _ctrl_clk_freq(ctrl_clk_freq)
+ , _timebase_freq(timebase_freq)
+ {
+ }
+
+ virtual ~ctrlport_endpoint_impl() = default;
+
+ virtual void poke32(uint32_t addr,
+ uint32_t data,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
+ bool ack = false)
+ {
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request =
+ send_request_packet(OP_WRITE, addr, {data}, timestamp, timeout_time);
+ // Optionally wait for an ACK
+ if (ack || _policy.force_acks) {
+ wait_for_ack(request, timeout_time);
+ }
+ }
+
+ virtual void multi_poke32(const std::vector<uint32_t> addrs,
+ const std::vector<uint32_t> data,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
+ bool ack = false)
+ {
+ if (addrs.size() != data.size()) {
+ throw uhd::value_error("addrs and data vectors must be of the same length");
+ }
+ for (size_t i = 0; i < data.size(); i++) {
+ poke32(addrs[i],
+ data[i],
+ (i == 0) ? timestamp : uhd::time_spec_t::ASAP,
+ (i == data.size() - 1) ? ack : false);
+ }
+ }
+
+ virtual void block_poke32(uint32_t first_addr,
+ const std::vector<uint32_t> data,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
+ bool ack = false)
+ {
+ for (size_t i = 0; i < data.size(); i++) {
+ poke32(first_addr + (i * sizeof(uint32_t)),
+ data[i],
+ (i == 0) ? timestamp : uhd::time_spec_t::ASAP,
+ (i == data.size() - 1) ? ack : false);
+ }
+
+ /* TODO: Uncomment when the atomic block poke is implemented in the FPGA
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request = send_request_packet(
+ OP_BLOCK_WRITE, first_addr, data, timestamp, timeout_time);
+ // Optionally wait for an ACK
+ if (ack || _policy.force_acks) {
+ wait_for_ack(request, timeout_time);
+ }
+ */
+ }
+
+ virtual uint32_t peek32(
+ uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
+ {
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request =
+ send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp, timeout_time);
+ // Wait for an ACK
+ auto response = wait_for_ack(request, timeout_time);
+ return response.data_vtr[0];
+ }
+
+ virtual std::vector<uint32_t> block_peek32(uint32_t first_addr,
+ size_t length,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP)
+ {
+ std::vector<uint32_t> values;
+ for (size_t i = 0; i < length; i++) {
+ values.push_back(peek32(first_addr + (i * sizeof(uint32_t)),
+ (i == 0) ? timestamp : uhd::time_spec_t::ASAP));
+ }
+ return values;
+
+ /* TODO: Uncomment when the atomic block peek is implemented in the FPGA
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request = send_request_packet(OP_READ,
+ first_addr,
+ std::vector<uint32_t>(length, 0),
+ timestamp,
+ timeout_time);
+ // Wait for an ACK
+ auto response = wait_for_ack(request, timeout_time);
+ return response.data_vtr;
+ */
+ }
+
+ virtual void poll32(uint32_t addr,
+ uint32_t data,
+ uint32_t mask,
+ uhd::time_spec_t timeout,
+ uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP,
+ bool ack = false)
+ {
+ // TODO: Uncomment when this is implemented in the FPGA
+ throw uhd::not_implemented_error("Control poll not implemented in the FPGA");
+
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request = send_request_packet(OP_POLL,
+ addr,
+ {data, mask, static_cast<uint32_t>(timeout.get_real_secs() * _ctrl_clk_freq)},
+ timestamp,
+ timeout_time);
+ // Optionally wait for an ACK
+ if (ack || _policy.force_acks) {
+ wait_for_ack(request, timeout_time);
+ }
+ }
+
+ virtual void sleep(uhd::time_spec_t duration, bool ack = false)
+ {
+ // Compute transaction expiration time
+ auto timeout_time = start_timeout(_policy.timeout);
+ // Send request
+ auto request = send_request_packet(OP_SLEEP,
+ 0,
+ {static_cast<uint32_t>(duration.get_real_secs() * _ctrl_clk_freq)},
+ uhd::time_spec_t::ASAP,
+ timeout_time);
+ // Optionally wait for an ACK
+ if (ack || _policy.force_acks) {
+ wait_for_ack(request, timeout_time);
+ }
+ }
+
+ virtual void register_async_msg_handler(async_msg_callback_t callback_f)
+ {
+ _handle_async_msg = callback_f;
+ }
+
+ virtual void set_policy(const std::string& name, const uhd::device_addr_t& args)
+ {
+ if (name == "default") {
+ _policy.timeout = args.cast<double>("timeout", DEFAULT_TIMEOUT);
+ _policy.force_acks = DEFAULT_FORCE_ACKS;
+ } else {
+ // TODO: Uncomment when custom policies are implemented
+ throw uhd::not_implemented_error("Policy implemented in the FPGA");
+ }
+ }
+
+ virtual void handle_recv(const ctrl_payload& rx_ctrl)
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+ if (rx_ctrl.is_ack) {
+ // Function to process a response with no sequence errors
+ auto process_correct_response = [this, rx_ctrl]() {
+ response_status_t resp_status = RESP_VALID;
+ // Grant flow control credits
+ _buff_occupied -= get_payload_size(_req_queue.front());
+ _buff_free_cond.notify_one();
+ if (get_payload_size(_req_queue.front()) != get_payload_size(rx_ctrl)) {
+ resp_status = RESP_SIZEERR;
+ }
+ // Pop the request from the queue
+ _req_queue.pop();
+ // Push the response into the response queue
+ _resp_queue.push(std::make_tuple(rx_ctrl, resp_status));
+ _resp_ready_cond.notify_one();
+ };
+ // Function to process a response with sequence errors
+ auto process_incorrect_response = [this]() {
+ // Grant flow control credits
+ _buff_occupied -= get_payload_size(_req_queue.front());
+ _buff_free_cond.notify_one();
+ // Push a fabricated response into the response queue
+ ctrl_payload resp(_req_queue.front());
+ resp.is_ack = true;
+ _resp_queue.push(std::make_tuple(resp, RESP_DROPPED));
+ _resp_ready_cond.notify_one();
+ // Pop the request from the queue
+ _req_queue.pop();
+ };
+
+ // Peek at the request queue to check the expected sequence number
+ int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num);
+ if (seq_num_diff == 0) { // No sequence error
+ process_correct_response();
+ } else if (seq_num_diff > 0) { // Packet(s) dropped
+ // Tag all dropped packets
+ for (int8_t i = 0; i < seq_num_diff; i++) {
+ process_incorrect_response();
+ }
+ // Process correct response
+ process_correct_response();
+ } else { // Reordered packet(s)
+ // Requests are processed in order. If seq_num_diff is negative then we
+ // have either already seen this response or we have dropped >128
+ // responses. Either way ignore this packet.
+ }
+ } else {
+ // Handle asynchronous message callback
+ ctrl_status_t status = CMD_CMDERR;
+ if (rx_ctrl.op_code != OP_WRITE) {
+ UHD_LOG_ERROR(
+ "CTRLEP", "Malformed async message request: Invalid opcode");
+ } else if (rx_ctrl.dst_port != _local_port) {
+ UHD_LOG_ERROR("CTRLEP", "Malformed async message request: Invalid port");
+ } else if (rx_ctrl.data_vtr.empty()) {
+ UHD_LOG_ERROR(
+ "CTRLEP", "Malformed async message request: Invalid num_data");
+ } else {
+ try {
+ _handle_async_msg(rx_ctrl.address, rx_ctrl.data_vtr[0]);
+ status = CMD_OKAY;
+ } catch (...) {
+ UHD_LOG_ERROR("CTRLEP", "Async message handler threw an exception");
+ }
+ }
+ try {
+ // Respond with an ACK packet
+ // Flow control not needed because we have allocated special room in the
+ // buffer for async message responses
+ ctrl_payload tx_ctrl(rx_ctrl);
+ tx_ctrl.is_ack = true;
+ tx_ctrl.src_epid = _my_epid;
+ tx_ctrl.status = status;
+ _handle_send(tx_ctrl, _policy.timeout);
+ } catch (...) {
+ UHD_LOG_ERROR("CTRLEP",
+ "Encountered an error sending a response for an async message");
+ }
+ }
+ }
+
+private:
+ //! Returns the length of the control payload in 32-bit words
+ inline static size_t get_payload_size(const ctrl_payload& payload)
+ {
+ return 2 + (payload.timestamp.is_initialized() ? 2 : 0) + payload.data_vtr.size();
+ }
+
+ //! Marks the start of a timeout for an operation and returns the expiration time
+ inline const steady_clock::time_point start_timeout(double duration)
+ {
+ return steady_clock::now() + (static_cast<int>(std::ceil(duration / 1e-6)) * 1us);
+ }
+
+ //! Sends a request control packet to a remote device
+ const ctrl_payload send_request_packet(ctrl_opcode_t op_code,
+ uint32_t address,
+ const std::vector<uint32_t>& data_vtr,
+ const uhd::time_spec_t& time_spec,
+ const steady_clock::time_point& timeout_time)
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+
+ // Convert from uhd::time_spec to timestamp
+ boost::optional<uint64_t> timestamp;
+ if (time_spec != time_spec_t::ASAP) {
+ timestamp = time_spec.to_ticks(_timebase_freq);
+ }
+
+ // Assemble the control payload
+ ctrl_payload tx_ctrl;
+ tx_ctrl.dst_port = _local_port;
+ tx_ctrl.src_port = _local_port;
+ tx_ctrl.seq_num = _tx_seq_num;
+ tx_ctrl.timestamp = timestamp;
+ tx_ctrl.is_ack = false;
+ tx_ctrl.src_epid = _my_epid;
+ tx_ctrl.address = address;
+ tx_ctrl.data_vtr = data_vtr;
+ tx_ctrl.byte_enable = 0xF;
+ tx_ctrl.op_code = op_code;
+ tx_ctrl.status = CMD_OKAY;
+
+ // Perform flow control
+ // If there is no room in the downstream buffer, then wait until the timeout
+ size_t pyld_size = get_payload_size(tx_ctrl);
+ auto buff_not_full = [this, pyld_size]() -> bool {
+ // Allocate room in the queue for one async response packet
+ // If we can fit the current request in the queue then we can proceed
+ return (_buff_occupied + pyld_size)
+ <= (_buff_capacity
+ - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs));
+ };
+ if (!buff_not_full()) {
+ if (not _buff_free_cond.wait_until(lock, timeout_time, buff_not_full)) {
+ throw uhd::op_timeout(
+ "Control operation timed out waiting for space in command buffer");
+ }
+ }
+ _buff_occupied += pyld_size;
+ _req_queue.push(tx_ctrl);
+
+ // Send the payload as soon as there is room in the buffer
+ _handle_send(tx_ctrl, _policy.timeout);
+ _tx_seq_num = (_tx_seq_num + 1) % 64;
+
+ return tx_ctrl;
+ }
+
+ //! Waits for and returns the ACK for the specified request
+ const ctrl_payload wait_for_ack(
+ const ctrl_payload& request, const steady_clock::time_point& timeout_time)
+ {
+ std::unique_lock<std::mutex> lock(_mutex);
+
+ auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); };
+ while (true) {
+ // Wait until there is a response in the response queue
+ if (!resp_ready()) {
+ if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) {
+ throw uhd::op_timeout("Control operation timed out waiting for ACK");
+ }
+ }
+ // Extract the response packet
+ ctrl_payload rx_ctrl;
+ response_status_t resp_status;
+ std::tie(rx_ctrl, resp_status) = _resp_queue.front();
+ _resp_queue.pop();
+ // Check if this is the response meant for the request
+ // Filter by op_code, address and seq_num
+ if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code
+ && rx_ctrl.address == request.address) {
+ // Validate transaction status
+ if (rx_ctrl.status == CMD_CMDERR) {
+ throw uhd::op_failed("Control operation returned a failing status");
+ } else if (rx_ctrl.status == CMD_TSERR) {
+ throw uhd::op_timerr("Control operation returned a timestamp error");
+ }
+ // Check data vector size
+ if (rx_ctrl.data_vtr.size() == 0) {
+ throw uhd::op_failed(
+ "Control operation returned a malformed response");
+ }
+ // Validate response status
+ if (resp_status == RESP_DROPPED) {
+ throw uhd::op_seqerr(
+ "Response for a control transaction was dropped");
+ } else if (resp_status == RESP_RTERR) {
+ throw uhd::op_timerr("Control operation encountered a routing error");
+ }
+ return rx_ctrl;
+ } else {
+ // This response does not belong to the request we passed in. Move on.
+ continue;
+ }
+ }
+ }
+
+
+ //! The parameters associated with the policy that governs this object
+ struct policy_args
+ {
+ double timeout = DEFAULT_TIMEOUT;
+ bool force_acks = DEFAULT_FORCE_ACKS;
+ };
+ //! The software status (different from the transaction status) of the response
+ enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR };
+
+ //! Function to call to send a control packet
+ const send_fn_t _handle_send;
+ //! The endpoint ID of this software endpoint
+ const sep_id_t _my_epid;
+ //! The local port number on the control crossbar for this ctrlport endpoint
+ const uint16_t _local_port;
+ //! The downstream buffer capacity in 32-bit words (used for flow control)
+ const size_t _buff_capacity;
+ //! The max number of outstanding async messages that a block can have at any time
+ const size_t _max_outstanding_async_msgs;
+ //! The clock rate of the clock that drives the ctrlport endpoint
+ const double _ctrl_clk_freq;
+ //! The clock rate of the primary timebase used for timed commands
+ const double _timebase_freq;
+
+ //! The function to call to handle an async message
+ async_msg_callback_t _handle_async_msg = [](uint32_t, uint32_t) {};
+ //! The current control sequence number of outgoing packets
+ uint8_t _tx_seq_num = 0;
+ //! The number of occupied words in the downstream buffer
+ ssize_t _buff_occupied = 0;
+ //! The arguments for the policy that governs this register interface
+ policy_args _policy;
+ //! A condition variable that hold the "downstream buffer is free" condition
+ std::condition_variable _buff_free_cond;
+ //! A queue that holds all outstanding requests
+ std::queue<ctrl_payload> _req_queue;
+ //! A queue that holds all outstanding responses and their status
+ std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue;
+ //! A condition variable that hold the "response is available" condition
+ std::condition_variable _resp_ready_cond;
+ //! A mutex to protect all state in this class
+ std::mutex _mutex;
+};
+
+ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send,
+ sep_id_t this_epid,
+ uint16_t local_port,
+ size_t buff_capacity,
+ size_t max_outstanding_async_msgs,
+ double ctrl_clk_freq,
+ double timebase_freq)
+{
+ return std::make_shared<ctrlport_endpoint_impl>(handle_send,
+ this_epid,
+ local_port,
+ buff_capacity,
+ max_outstanding_async_msgs,
+ ctrl_clk_freq,
+ timebase_freq);
+}