diff options
author | Ashish Chaudhari <ashish@ettus.com> | 2019-05-20 17:14:20 -0700 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:15 -0800 |
commit | 0b698810a163e3986939341fee5014fc6ad7e7f9 (patch) | |
tree | 22e2c122542928e29a2a0a8f6d383ad12e65b6a8 /host/lib | |
parent | ad4004f1f78d5b64dae50b6e456d0206e824978f (diff) | |
download | uhd-0b698810a163e3986939341fee5014fc6ad7e7f9.tar.gz uhd-0b698810a163e3986939341fee5014fc6ad7e7f9.tar.bz2 uhd-0b698810a163e3986939341fee5014fc6ad7e7f9.zip |
rfnoc: Added impl for reg_iface and ctrl_endpoint
- Added new register_iface class that translates high-level
peek/poke calls into CHDR control payloads
- Added new chdr_ctrl_endpoint class that emulates a control
stream endpoint in SW. It can create and handle multiple
register interfaces
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr/chdr_types.hpp | 8 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp | 62 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/ctrlport_endpoint.hpp | 61 | ||||
-rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr/chdr_types.cpp | 3 | ||||
-rw-r--r-- | host/lib/rfnoc/chdr_ctrl_endpoint.cpp | 180 | ||||
-rw-r--r-- | host/lib/rfnoc/ctrlport_endpoint.cpp | 471 |
7 files changed, 786 insertions, 1 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr/chdr_types.hpp b/host/lib/include/uhdlib/rfnoc/chdr/chdr_types.hpp index 8363e1bcf..447e7db91 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr/chdr_types.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr/chdr_types.hpp @@ -309,6 +309,8 @@ public: // Functions ctrl_payload(const ctrl_payload& rhs) = default; ctrl_payload(ctrl_payload&& rhs) = default; + ctrl_payload& operator=(const ctrl_payload& rhs) = default; + //! Populate the header for this type of packet void populate_header(chdr_header& header) const; @@ -422,6 +424,8 @@ public: // Functions strs_payload(const strs_payload& rhs) = default; strs_payload(strs_payload&& rhs) = default; + strs_payload& operator=(const strs_payload& rhs) = default; + //! Populate the header for this type of packet void populate_header(chdr_header& header) const; @@ -517,6 +521,8 @@ public: // Functions strc_payload(const strc_payload& rhs) = default; strc_payload(strc_payload&& rhs) = default; + strc_payload& operator=(const strc_payload& rhs) = default; + //! Populate the header for this type of packet void populate_header(chdr_header& header) const; @@ -751,6 +757,8 @@ public: mgmt_payload(const mgmt_payload& rhs) = default; mgmt_payload(mgmt_payload&& rhs) = default; + mgmt_payload& operator=(const mgmt_payload& rhs) = default; + inline void set_header(sep_id_t src_epid, uint16_t protover, chdr_w_t chdr_w) { _src_epid = src_epid; diff --git a/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp new file mode 100644 index 000000000..c13955888 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/chdr_ctrl_endpoint.hpp @@ -0,0 +1,62 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP +#define INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP + +#include <uhdlib/rfnoc/chdr/chdr_types.hpp> +#include <uhdlib/rfnoc/ctrlport_endpoint.hpp> +#include <uhdlib/rfnoc/xports.hpp> +#include <functional> +#include <memory> + +namespace uhd { namespace rfnoc { + +/*! A software interface that represents a CHDR control endpoint + * + * The endpoint is capable of sending/receiving CHDR packets + * and creating multiple ctrlport_endpoint interfaces + */ +class chdr_ctrl_endpoint +{ +public: + using uptr = std::unique_ptr<chdr_ctrl_endpoint>; + + virtual ~chdr_ctrl_endpoint() = 0; + + //! Creates a new register interface for the specified port + // + // \param port The port number on the control crossbar + // \param buff_capacity The buffer capacity of the downstream buff in 32-bit words + // \param max_outstanding_async_msgs Max outstanding async messages allowed + // \param ctrl_clk_freq Frequency of the clock driving the ctrlport logic + // \param timebase_freq Frequency of the timebase (for timed commands) + // + virtual ctrlport_endpoint::sptr get_ctrlport_ep(uint16_t port, + size_t buff_capacity, + size_t max_outstanding_async_msgs, + double ctrl_clk_freq, + double timebase_freq) = 0; + + //! Returns the number of dropped packets due to misclassification + virtual size_t get_num_drops() const = 0; + + //! Creates a control endpoint object + // + // \param xports The transports used to send and recv packets + // \param pkt_factor An instance of the CHDR packet factory + // \param my_epid The endpoint ID of this software endpoint + // + static uptr make(const both_xports_t& xports, + const chdr::chdr_packet_factory& pkt_factory, + sep_id_t dst_epid, + sep_id_t my_epid); + +}; // class chdr_ctrl_endpoint + +}} /* namespace uhd::rfnoc */ + +#endif /* INCLUDED_LIBUHD_RFNOC_CHDR_CTRL_ENDPOINT_HPP */ diff --git a/host/lib/include/uhdlib/rfnoc/ctrlport_endpoint.hpp b/host/lib/include/uhdlib/rfnoc/ctrlport_endpoint.hpp new file mode 100644 index 000000000..d135f284c --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/ctrlport_endpoint.hpp @@ -0,0 +1,61 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_RFNOC_CTRLPORT_ENDPOINT_HPP +#define INCLUDED_LIBUHD_RFNOC_CTRLPORT_ENDPOINT_HPP + +#include <uhd/rfnoc/register_iface.hpp> +#include <uhdlib/rfnoc/chdr/chdr_types.hpp> +#include <memory> + +namespace uhd { namespace rfnoc { + +/*! A software interface that represents a ControlPort endpoint + * + * This interface supports the following: + * - All capabilities of register_iface + * - A function to handle received packets + * - A static factory class to create these endpoints + */ +class ctrlport_endpoint : public register_iface +{ +public: + using sptr = std::shared_ptr<ctrlport_endpoint>; + + //! The function to call when sending a packet to a remote device + using send_fn_t = std::function<void(const chdr::ctrl_payload&, double)>; + + virtual ~ctrlport_endpoint() = 0; + + //! Handles an incoming control packet (request and response) + // + // \param rx_ctrl The control payload of the received packet + // + virtual void handle_recv(const chdr::ctrl_payload& rx_ctrl) = 0; + + //! Creates a new register interface (ctrl_portendpoint) + // + // \param handle_send The function to call to send a control packet + // \param my_epid The endpoint ID of the SW control stream endpoint + // \param port The port number on the control crossbar + // \param buff_capacity The buffer capacity of the downstream buff in 32-bit words + // \param max_outstanding_async_msgs Max outstanding async messages allowed + // \param ctrl_clk_freq Frequency of the clock driving the ctrlport logic + // \param timebase_freq Frequency of the timebase (for timed commands) + // + static sptr make(const send_fn_t& handle_send, + sep_id_t my_epid, + uint16_t local_port, + size_t buff_capacity, + size_t max_outstanding_async_msgs, + double ctrl_clk_freq, + double timebase_freq); + +}; // class ctrlport_endpoint + +}} /* namespace uhd::rfnoc */ + +#endif /* INCLUDED_LIBUHD_RFNOC_CTRLPORT_ENDPOINT_HPP */ diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index 6aab0d499..0d989b83b 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -27,6 +27,8 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/node_ctrl_base.cpp ${CMAKE_CURRENT_SOURCE_DIR}/node.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rate_node_ctrl.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/ctrlport_endpoint.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_ctrl_endpoint.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rx_stream_terminator.cpp ${CMAKE_CURRENT_SOURCE_DIR}/scalar_node_ctrl.cpp ${CMAKE_CURRENT_SOURCE_DIR}/sink_block_ctrl_base.cpp diff --git a/host/lib/rfnoc/chdr/chdr_types.cpp b/host/lib/rfnoc/chdr/chdr_types.cpp index 8786b8193..8920e4fe3 100644 --- a/host/lib/rfnoc/chdr/chdr_types.cpp +++ b/host/lib/rfnoc/chdr/chdr_types.cpp @@ -57,7 +57,8 @@ size_t ctrl_payload::serialize(uint64_t* buff, | ((static_cast<uint64_t>(data_vtr.size()) & mask_u64(NUM_DATA_WIDTH)) << NUM_DATA_OFFSET) | ((static_cast<uint64_t>(seq_num) & mask_u64(SEQ_NUM_WIDTH)) << SEQ_NUM_OFFSET) - | ((static_cast<uint64_t>(timestamp ? 1 : 0) & mask_u64(HAS_TIME_WIDTH)) + | ((static_cast<uint64_t>(timestamp.is_initialized() ? 1 : 0) + & mask_u64(HAS_TIME_WIDTH)) << HAS_TIME_OFFSET) | ((static_cast<uint64_t>(is_ack) & mask_u64(IS_ACK_WIDTH)) << IS_ACK_OFFSET) | ((static_cast<uint64_t>(src_epid) & mask_u64(SRC_EPID_WIDTH)) diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp new file mode 100644 index 000000000..cab90fb49 --- /dev/null +++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp @@ -0,0 +1,180 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/safe_call.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/rfnoc/chdr/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr/chdr_types.hpp> +#include <uhdlib/rfnoc/chdr_ctrl_endpoint.hpp> +#include <boost/format.hpp> +#include <mutex> +#include <thread> +#include <atomic> + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::chdr; + +using namespace std::placeholders; + +chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default; + +class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint +{ +public: + chdr_ctrl_endpoint_impl(const both_xports_t& xports, + const chdr::chdr_packet_factory& pkt_factory, + sep_id_t dst_epid, + sep_id_t my_epid) + : _dst_epid(dst_epid) + , _my_epid(my_epid) + , _xports(xports) + , _send_seqnum(0) + , _send_pkt(pkt_factory.make_ctrl()) + , _recv_pkt(pkt_factory.make_ctrl()) + , _stop_recv_thread(false) + , _recv_thread([this]() { recv_worker(); }) + { + const std::string thread_name(str(boost::format("uhd_ctrl_ep%04x") % _my_epid)); + uhd::set_thread_name(&_recv_thread, thread_name); + UHD_LOG_DEBUG("RFNOC", + boost::format("Started thread %s to process messages for CHDR Ctrl EP for " + "EPID %d -> EPID %d") + % thread_name % _my_epid % _dst_epid); + } + + virtual ~chdr_ctrl_endpoint_impl() + { + UHD_SAFE_CALL( + // Interrupt buffer updater loop + _stop_recv_thread = true; + // Wait for loop to finish + // No timeout on join. The recv loop is guaranteed + // to terminate in a reasonable amount of time because + // there are no timed blocks on the underlying. + _recv_thread.join(); + // Flush base transport + while (_xports.recv->get_recv_buff(0.0001)) /*NOP*/; + // Release child endpoints + _endpoint_map.clear();); + } + + virtual ctrlport_endpoint::sptr get_ctrlport_ep(uint16_t port, + size_t buff_capacity, + size_t max_outstanding_async_msgs, + double ctrl_clk_freq, + double timebase_freq) + { + std::lock_guard<std::mutex> lock(_mutex); + + // Function to send a control payload + auto send_fn = [this](const ctrl_payload& payload, double timeout) { + std::lock_guard<std::mutex> lock(_mutex); + // Build header + chdr_header header; + header.set_pkt_type(PKT_TYPE_CTRL); + header.set_num_mdata(0); + header.set_seq_num(_send_seqnum++); + header.set_dst_epid(_dst_epid); + // Acquire send buffer and send the packet + auto send_buff = _xports.send->get_send_buff(timeout); + _send_pkt->refresh(send_buff->cast<void*>(), header, payload); + send_buff->commit(header.get_length()); + }; + + if (_endpoint_map.find(port) == _endpoint_map.end()) { + ctrlport_endpoint::sptr ctrlport_ep = ctrlport_endpoint::make(send_fn, + _my_epid, + port, + buff_capacity, + max_outstanding_async_msgs, + ctrl_clk_freq, + timebase_freq); + _endpoint_map.insert(std::make_pair(port, ctrlport_ep)); + UHD_LOG_DEBUG("RFNOC", + boost::format("Created ctrlport endpoint for port %d on EPID %d") % port + % _my_epid); + return ctrlport_ep; + } else { + return _endpoint_map.at(port); + } + } + + virtual size_t get_num_drops() const + { + return _num_drops; + } + +private: + void recv_worker() + { + // Run forever: + // - Pull packets from the base transport + // - Route them based on the dst_port + // - Pass them to the ctrlport_endpoint for additional processing + while (not _stop_recv_thread) { + auto buff = _xports.recv->get_recv_buff(0.0); + if (buff) { + std::lock_guard<std::mutex> lock(_mutex); + try { + _recv_pkt->refresh(buff->cast<void*>()); + const ctrl_payload payload = _recv_pkt->get_payload(); + if (_endpoint_map.find(payload.dst_port) != _endpoint_map.end()) { + _endpoint_map.at(payload.dst_port)->handle_recv(payload); + } + } catch (...) { + // Ignore all errors + } + } else { + // Be a good citizen and yield if no packet is processed + static const size_t MIN_DUR = 1; + boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR)); + // We call sleep(MIN_DUR) above instead of yield() to ensure that we + // relinquish the current scheduler time slot. + // yield() is a hint to the scheduler to end the time + // slice early and schedule in another thread that is ready to run. + // However in most situations, there will be no other thread and + // this thread will continue to run which will rail a CPU core. + // We call sleep(MIN_DUR=1) instead which will sleep for a minimum + // time. Ideally we would like to use boost::chrono::.*seconds::min() + // but that is bound to 0, which causes the sleep_for call to be a + // no-op and thus useless to actually force a sleep. + } + } + } + + // The endpoint ID of the destination + const sep_id_t _dst_epid; + // The endpoint ID of this software endpoint + const sep_id_t _my_epid; + // Send/recv transports + const uhd::both_xports_t _xports; + // The curent sequence number for a send packet + size_t _send_seqnum = 0; + // The number of packets dropped + size_t _num_drops = 0; + // Packet containers + chdr_ctrl_packet::uptr _send_pkt; + chdr_ctrl_packet::cuptr _recv_pkt; + // A collection of ctrlport endpoints (keyed by the port number) + std::map<uint16_t, ctrlport_endpoint::sptr> _endpoint_map; + // A thread that will handle all responses and async message requests + std::atomic_bool _stop_recv_thread; + std::thread _recv_thread; + // Mutex that protects all state in this class + std::mutex _mutex; +}; + +chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const both_xports_t& xports, + const chdr::chdr_packet_factory& pkt_factory, + sep_id_t dst_epid, + sep_id_t my_epid) +{ + return std::make_unique<chdr_ctrl_endpoint_impl>( + xports, pkt_factory, dst_epid, my_epid); +} diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp new file mode 100644 index 000000000..ade5b89ee --- /dev/null +++ b/host/lib/rfnoc/ctrlport_endpoint.cpp @@ -0,0 +1,471 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/chdr/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr/chdr_types.hpp> +#include <uhdlib/rfnoc/ctrlport_endpoint.hpp> +#include <condition_variable> +#include <boost/format.hpp> +#include <mutex> +#include <queue> + + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::chdr; + +using namespace std::chrono; +using namespace std::chrono_literals; + +namespace { +//! Max async msg (CTRL_WRITE) size in 32-bit words (2 hdr, 2 TS, 1 op-word, 1 data) +constexpr size_t ASYNC_MESSAGE_SIZE = 6; +//! Default completion timeout for transactions +constexpr double DEFAULT_TIMEOUT = 0.1; +//! Default value for whether ACKs are always required +constexpr bool DEFAULT_FORCE_ACKS = false; +} + +ctrlport_endpoint::~ctrlport_endpoint() = default; + +class ctrlport_endpoint_impl : public ctrlport_endpoint +{ +public: + ctrlport_endpoint_impl(const send_fn_t& send_fcn, + sep_id_t my_epid, + uint16_t local_port, + size_t buff_capacity, + size_t max_outstanding_async_msgs, + double ctrl_clk_freq, + double timebase_freq) + : _handle_send(send_fcn) + , _my_epid(my_epid) + , _local_port(local_port) + , _buff_capacity(buff_capacity) + , _max_outstanding_async_msgs(max_outstanding_async_msgs) + , _ctrl_clk_freq(ctrl_clk_freq) + , _timebase_freq(timebase_freq) + { + } + + virtual ~ctrlport_endpoint_impl() = default; + + virtual void poke32(uint32_t addr, + uint32_t data, + uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, + bool ack = false) + { + // Compute transaction expiration time + auto timeout_time = start_timeout(_policy.timeout); + // Send request + auto request = + send_request_packet(OP_WRITE, addr, {data}, timestamp, timeout_time); + // Optionally wait for an ACK + if (ack || _policy.force_acks) { + wait_for_ack(request, timeout_time); + } + } + + virtual void multi_poke32(const std::vector<uint32_t> addrs, + const std::vector<uint32_t> data, + uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, + bool ack = false) + { + if (addrs.size() != data.size()) { + throw uhd::value_error("addrs and data vectors must be of the same length"); + } + for (size_t i = 0; i < data.size(); i++) { + poke32(addrs[i], + data[i], + (i == 0) ? timestamp : uhd::time_spec_t::ASAP, + (i == data.size() - 1) ? ack : false); + } + } + + virtual void block_poke32(uint32_t first_addr, + const std::vector<uint32_t> data, + uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, + bool ack = false) + { + for (size_t i = 0; i < data.size(); i++) { + poke32(first_addr + (i * sizeof(uint32_t)), + data[i], + (i == 0) ? timestamp : uhd::time_spec_t::ASAP, + (i == data.size() - 1) ? ack : false); + } + + /* TODO: Uncomment when the atomic block poke is implemented in the FPGA + // Compute transaction expiration time + auto timeout_time = start_timeout(_policy.timeout); + // Send request + auto request = send_request_packet( + OP_BLOCK_WRITE, first_addr, data, timestamp, timeout_time); + // Optionally wait for an ACK + if (ack || _policy.force_acks) { + wait_for_ack(request, timeout_time); + } + */ + } + + virtual uint32_t peek32( + uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) + { + // Compute transaction expiration time + auto timeout_time = start_timeout(_policy.timeout); + // Send request + auto request = + send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp, timeout_time); + // Wait for an ACK + auto response = wait_for_ack(request, timeout_time); + return response.data_vtr[0]; + } + + virtual std::vector<uint32_t> block_peek32(uint32_t first_addr, + size_t length, + uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) + { + std::vector<uint32_t> values; + for (size_t i = 0; i < length; i++) { + values.push_back(peek32(first_addr + (i * sizeof(uint32_t)), + (i == 0) ? timestamp : uhd::time_spec_t::ASAP)); + } + return values; + + /* TODO: Uncomment when the atomic block peek is implemented in the FPGA + // Compute transaction expiration time + auto timeout_time = start_timeout(_policy.timeout); + // Send request + auto request = send_request_packet(OP_READ, + first_addr, + std::vector<uint32_t>(length, 0), + timestamp, + timeout_time); + // Wait for an ACK + auto response = wait_for_ack(request, timeout_time); + return response.data_vtr; + */ + } + + virtual void poll32(uint32_t addr, + uint32_t data, + uint32_t mask, + uhd::time_spec_t timeout, + uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, + bool ack = false) + { + // TODO: Uncomment when this is implemented in the FPGA + throw uhd::not_implemented_error("Control poll not implemented in the FPGA"); + + // Compute transaction expiration time + auto timeout_time = start_timeout(_policy.timeout); + // Send request + auto request = send_request_packet(OP_POLL, + addr, + {data, mask, static_cast<uint32_t>(timeout.get_real_secs() * _ctrl_clk_freq)}, + timestamp, + timeout_time); + // Optionally wait for an ACK + if (ack || _policy.force_acks) { + wait_for_ack(request, timeout_time); + } + } + + virtual void sleep(uhd::time_spec_t duration, bool ack = false) + { + // Compute transaction expiration time + auto timeout_time = start_timeout(_policy.timeout); + // Send request + auto request = send_request_packet(OP_SLEEP, + 0, + {static_cast<uint32_t>(duration.get_real_secs() * _ctrl_clk_freq)}, + uhd::time_spec_t::ASAP, + timeout_time); + // Optionally wait for an ACK + if (ack || _policy.force_acks) { + wait_for_ack(request, timeout_time); + } + } + + virtual void register_async_msg_handler(async_msg_callback_t callback_f) + { + _handle_async_msg = callback_f; + } + + virtual void set_policy(const std::string& name, const uhd::device_addr_t& args) + { + if (name == "default") { + _policy.timeout = args.cast<double>("timeout", DEFAULT_TIMEOUT); + _policy.force_acks = DEFAULT_FORCE_ACKS; + } else { + // TODO: Uncomment when custom policies are implemented + throw uhd::not_implemented_error("Policy implemented in the FPGA"); + } + } + + virtual void handle_recv(const ctrl_payload& rx_ctrl) + { + std::unique_lock<std::mutex> lock(_mutex); + if (rx_ctrl.is_ack) { + // Function to process a response with no sequence errors + auto process_correct_response = [this, rx_ctrl]() { + response_status_t resp_status = RESP_VALID; + // Grant flow control credits + _buff_occupied -= get_payload_size(_req_queue.front()); + _buff_free_cond.notify_one(); + if (get_payload_size(_req_queue.front()) != get_payload_size(rx_ctrl)) { + resp_status = RESP_SIZEERR; + } + // Pop the request from the queue + _req_queue.pop(); + // Push the response into the response queue + _resp_queue.push(std::make_tuple(rx_ctrl, resp_status)); + _resp_ready_cond.notify_one(); + }; + // Function to process a response with sequence errors + auto process_incorrect_response = [this]() { + // Grant flow control credits + _buff_occupied -= get_payload_size(_req_queue.front()); + _buff_free_cond.notify_one(); + // Push a fabricated response into the response queue + ctrl_payload resp(_req_queue.front()); + resp.is_ack = true; + _resp_queue.push(std::make_tuple(resp, RESP_DROPPED)); + _resp_ready_cond.notify_one(); + // Pop the request from the queue + _req_queue.pop(); + }; + + // Peek at the request queue to check the expected sequence number + int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num); + if (seq_num_diff == 0) { // No sequence error + process_correct_response(); + } else if (seq_num_diff > 0) { // Packet(s) dropped + // Tag all dropped packets + for (int8_t i = 0; i < seq_num_diff; i++) { + process_incorrect_response(); + } + // Process correct response + process_correct_response(); + } else { // Reordered packet(s) + // Requests are processed in order. If seq_num_diff is negative then we + // have either already seen this response or we have dropped >128 + // responses. Either way ignore this packet. + } + } else { + // Handle asynchronous message callback + ctrl_status_t status = CMD_CMDERR; + if (rx_ctrl.op_code != OP_WRITE) { + UHD_LOG_ERROR( + "CTRLEP", "Malformed async message request: Invalid opcode"); + } else if (rx_ctrl.dst_port != _local_port) { + UHD_LOG_ERROR("CTRLEP", "Malformed async message request: Invalid port"); + } else if (rx_ctrl.data_vtr.empty()) { + UHD_LOG_ERROR( + "CTRLEP", "Malformed async message request: Invalid num_data"); + } else { + try { + _handle_async_msg(rx_ctrl.address, rx_ctrl.data_vtr[0]); + status = CMD_OKAY; + } catch (...) { + UHD_LOG_ERROR("CTRLEP", "Async message handler threw an exception"); + } + } + try { + // Respond with an ACK packet + // Flow control not needed because we have allocated special room in the + // buffer for async message responses + ctrl_payload tx_ctrl(rx_ctrl); + tx_ctrl.is_ack = true; + tx_ctrl.src_epid = _my_epid; + tx_ctrl.status = status; + _handle_send(tx_ctrl, _policy.timeout); + } catch (...) { + UHD_LOG_ERROR("CTRLEP", + "Encountered an error sending a response for an async message"); + } + } + } + +private: + //! Returns the length of the control payload in 32-bit words + inline static size_t get_payload_size(const ctrl_payload& payload) + { + return 2 + (payload.timestamp.is_initialized() ? 2 : 0) + payload.data_vtr.size(); + } + + //! Marks the start of a timeout for an operation and returns the expiration time + inline const steady_clock::time_point start_timeout(double duration) + { + return steady_clock::now() + (static_cast<int>(std::ceil(duration / 1e-6)) * 1us); + } + + //! Sends a request control packet to a remote device + const ctrl_payload send_request_packet(ctrl_opcode_t op_code, + uint32_t address, + const std::vector<uint32_t>& data_vtr, + const uhd::time_spec_t& time_spec, + const steady_clock::time_point& timeout_time) + { + std::unique_lock<std::mutex> lock(_mutex); + + // Convert from uhd::time_spec to timestamp + boost::optional<uint64_t> timestamp; + if (time_spec != time_spec_t::ASAP) { + timestamp = time_spec.to_ticks(_timebase_freq); + } + + // Assemble the control payload + ctrl_payload tx_ctrl; + tx_ctrl.dst_port = _local_port; + tx_ctrl.src_port = _local_port; + tx_ctrl.seq_num = _tx_seq_num; + tx_ctrl.timestamp = timestamp; + tx_ctrl.is_ack = false; + tx_ctrl.src_epid = _my_epid; + tx_ctrl.address = address; + tx_ctrl.data_vtr = data_vtr; + tx_ctrl.byte_enable = 0xF; + tx_ctrl.op_code = op_code; + tx_ctrl.status = CMD_OKAY; + + // Perform flow control + // If there is no room in the downstream buffer, then wait until the timeout + size_t pyld_size = get_payload_size(tx_ctrl); + auto buff_not_full = [this, pyld_size]() -> bool { + // Allocate room in the queue for one async response packet + // If we can fit the current request in the queue then we can proceed + return (_buff_occupied + pyld_size) + <= (_buff_capacity + - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs)); + }; + if (!buff_not_full()) { + if (not _buff_free_cond.wait_until(lock, timeout_time, buff_not_full)) { + throw uhd::op_timeout( + "Control operation timed out waiting for space in command buffer"); + } + } + _buff_occupied += pyld_size; + _req_queue.push(tx_ctrl); + + // Send the payload as soon as there is room in the buffer + _handle_send(tx_ctrl, _policy.timeout); + _tx_seq_num = (_tx_seq_num + 1) % 64; + + return tx_ctrl; + } + + //! Waits for and returns the ACK for the specified request + const ctrl_payload wait_for_ack( + const ctrl_payload& request, const steady_clock::time_point& timeout_time) + { + std::unique_lock<std::mutex> lock(_mutex); + + auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); }; + while (true) { + // Wait until there is a response in the response queue + if (!resp_ready()) { + if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) { + throw uhd::op_timeout("Control operation timed out waiting for ACK"); + } + } + // Extract the response packet + ctrl_payload rx_ctrl; + response_status_t resp_status; + std::tie(rx_ctrl, resp_status) = _resp_queue.front(); + _resp_queue.pop(); + // Check if this is the response meant for the request + // Filter by op_code, address and seq_num + if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code + && rx_ctrl.address == request.address) { + // Validate transaction status + if (rx_ctrl.status == CMD_CMDERR) { + throw uhd::op_failed("Control operation returned a failing status"); + } else if (rx_ctrl.status == CMD_TSERR) { + throw uhd::op_timerr("Control operation returned a timestamp error"); + } + // Check data vector size + if (rx_ctrl.data_vtr.size() == 0) { + throw uhd::op_failed( + "Control operation returned a malformed response"); + } + // Validate response status + if (resp_status == RESP_DROPPED) { + throw uhd::op_seqerr( + "Response for a control transaction was dropped"); + } else if (resp_status == RESP_RTERR) { + throw uhd::op_timerr("Control operation encountered a routing error"); + } + return rx_ctrl; + } else { + // This response does not belong to the request we passed in. Move on. + continue; + } + } + } + + + //! The parameters associated with the policy that governs this object + struct policy_args + { + double timeout = DEFAULT_TIMEOUT; + bool force_acks = DEFAULT_FORCE_ACKS; + }; + //! The software status (different from the transaction status) of the response + enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR }; + + //! Function to call to send a control packet + const send_fn_t _handle_send; + //! The endpoint ID of this software endpoint + const sep_id_t _my_epid; + //! The local port number on the control crossbar for this ctrlport endpoint + const uint16_t _local_port; + //! The downstream buffer capacity in 32-bit words (used for flow control) + const size_t _buff_capacity; + //! The max number of outstanding async messages that a block can have at any time + const size_t _max_outstanding_async_msgs; + //! The clock rate of the clock that drives the ctrlport endpoint + const double _ctrl_clk_freq; + //! The clock rate of the primary timebase used for timed commands + const double _timebase_freq; + + //! The function to call to handle an async message + async_msg_callback_t _handle_async_msg = [](uint32_t, uint32_t) {}; + //! The current control sequence number of outgoing packets + uint8_t _tx_seq_num = 0; + //! The number of occupied words in the downstream buffer + ssize_t _buff_occupied = 0; + //! The arguments for the policy that governs this register interface + policy_args _policy; + //! A condition variable that hold the "downstream buffer is free" condition + std::condition_variable _buff_free_cond; + //! A queue that holds all outstanding requests + std::queue<ctrl_payload> _req_queue; + //! A queue that holds all outstanding responses and their status + std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue; + //! A condition variable that hold the "response is available" condition + std::condition_variable _resp_ready_cond; + //! A mutex to protect all state in this class + std::mutex _mutex; +}; + +ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send, + sep_id_t this_epid, + uint16_t local_port, + size_t buff_capacity, + size_t max_outstanding_async_msgs, + double ctrl_clk_freq, + double timebase_freq) +{ + return std::make_shared<ctrlport_endpoint_impl>(handle_send, + this_epid, + local_port, + buff_capacity, + max_outstanding_async_msgs, + ctrl_clk_freq, + timebase_freq); +} |