diff options
Diffstat (limited to 'host/lib/rfnoc')
| -rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/lib/rfnoc/chdr/chdr_types.cpp | 3 | ||||
| -rw-r--r-- | host/lib/rfnoc/chdr_ctrl_endpoint.cpp | 180 | ||||
| -rw-r--r-- | host/lib/rfnoc/ctrlport_endpoint.cpp | 471 | 
4 files changed, 655 insertions, 1 deletions
| diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index 6aab0d499..0d989b83b 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -27,6 +27,8 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/node_ctrl_base.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/node.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/rate_node_ctrl.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/ctrlport_endpoint.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/chdr_ctrl_endpoint.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/rx_stream_terminator.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/scalar_node_ctrl.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/sink_block_ctrl_base.cpp diff --git a/host/lib/rfnoc/chdr/chdr_types.cpp b/host/lib/rfnoc/chdr/chdr_types.cpp index 8786b8193..8920e4fe3 100644 --- a/host/lib/rfnoc/chdr/chdr_types.cpp +++ b/host/lib/rfnoc/chdr/chdr_types.cpp @@ -57,7 +57,8 @@ size_t ctrl_payload::serialize(uint64_t* buff,          | ((static_cast<uint64_t>(data_vtr.size()) & mask_u64(NUM_DATA_WIDTH))                << NUM_DATA_OFFSET)          | ((static_cast<uint64_t>(seq_num) & mask_u64(SEQ_NUM_WIDTH)) << SEQ_NUM_OFFSET) -        | ((static_cast<uint64_t>(timestamp ? 1 : 0) & mask_u64(HAS_TIME_WIDTH)) +        | ((static_cast<uint64_t>(timestamp.is_initialized() ? 1 : 0) +               & mask_u64(HAS_TIME_WIDTH))                << HAS_TIME_OFFSET)          | ((static_cast<uint64_t>(is_ack) & mask_u64(IS_ACK_WIDTH)) << IS_ACK_OFFSET)          | ((static_cast<uint64_t>(src_epid) & mask_u64(SRC_EPID_WIDTH)) diff --git a/host/lib/rfnoc/chdr_ctrl_endpoint.cpp b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp new file mode 100644 index 000000000..cab90fb49 --- /dev/null +++ b/host/lib/rfnoc/chdr_ctrl_endpoint.cpp @@ -0,0 +1,180 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/utils/safe_call.hpp> +#include <uhd/utils/thread.hpp> +#include <uhdlib/rfnoc/chdr/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr/chdr_types.hpp> +#include <uhdlib/rfnoc/chdr_ctrl_endpoint.hpp> +#include <boost/format.hpp> +#include <mutex> +#include <thread> +#include <atomic> + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::chdr; + +using namespace std::placeholders; + +chdr_ctrl_endpoint::~chdr_ctrl_endpoint() = default; + +class chdr_ctrl_endpoint_impl : public chdr_ctrl_endpoint +{ +public: +    chdr_ctrl_endpoint_impl(const both_xports_t& xports, +        const chdr::chdr_packet_factory& pkt_factory, +        sep_id_t dst_epid, +        sep_id_t my_epid) +        : _dst_epid(dst_epid) +        , _my_epid(my_epid) +        , _xports(xports) +        , _send_seqnum(0) +        , _send_pkt(pkt_factory.make_ctrl()) +        , _recv_pkt(pkt_factory.make_ctrl()) +        , _stop_recv_thread(false) +        , _recv_thread([this]() { recv_worker(); }) +    { +        const std::string thread_name(str(boost::format("uhd_ctrl_ep%04x") % _my_epid)); +        uhd::set_thread_name(&_recv_thread, thread_name); +        UHD_LOG_DEBUG("RFNOC", +            boost::format("Started thread %s to process messages for CHDR Ctrl EP for " +                          "EPID %d -> EPID %d") +                % thread_name % _my_epid % _dst_epid); +    } + +    virtual ~chdr_ctrl_endpoint_impl() +    { +        UHD_SAFE_CALL( +            // Interrupt buffer updater loop +            _stop_recv_thread = true; +            // Wait for loop to finish +            // No timeout on join. The recv loop is guaranteed +            // to terminate in a reasonable amount of time because +            // there are no timed blocks on the underlying. +            _recv_thread.join(); +            // Flush base transport +            while (_xports.recv->get_recv_buff(0.0001)) /*NOP*/; +            // Release child endpoints +            _endpoint_map.clear();); +    } + +    virtual ctrlport_endpoint::sptr get_ctrlport_ep(uint16_t port, +        size_t buff_capacity, +        size_t max_outstanding_async_msgs, +        double ctrl_clk_freq, +        double timebase_freq) +    { +        std::lock_guard<std::mutex> lock(_mutex); + +        // Function to send a control payload +        auto send_fn = [this](const ctrl_payload& payload, double timeout) { +            std::lock_guard<std::mutex> lock(_mutex); +            // Build header +            chdr_header header; +            header.set_pkt_type(PKT_TYPE_CTRL); +            header.set_num_mdata(0); +            header.set_seq_num(_send_seqnum++); +            header.set_dst_epid(_dst_epid); +            // Acquire send buffer and send the packet +            auto send_buff = _xports.send->get_send_buff(timeout); +            _send_pkt->refresh(send_buff->cast<void*>(), header, payload); +            send_buff->commit(header.get_length()); +        }; + +        if (_endpoint_map.find(port) == _endpoint_map.end()) { +            ctrlport_endpoint::sptr ctrlport_ep = ctrlport_endpoint::make(send_fn, +                _my_epid, +                port, +                buff_capacity, +                max_outstanding_async_msgs, +                ctrl_clk_freq, +                timebase_freq); +            _endpoint_map.insert(std::make_pair(port, ctrlport_ep)); +            UHD_LOG_DEBUG("RFNOC", +                boost::format("Created ctrlport endpoint for port %d on EPID %d") % port +                    % _my_epid); +            return ctrlport_ep; +        } else { +            return _endpoint_map.at(port); +        } +    } + +    virtual size_t get_num_drops() const +    { +        return _num_drops; +    } + +private: +    void recv_worker() +    { +        // Run forever: +        // - Pull packets from the base transport +        // - Route them based on the dst_port +        // - Pass them to the ctrlport_endpoint for additional processing +        while (not _stop_recv_thread) { +            auto buff = _xports.recv->get_recv_buff(0.0); +            if (buff) { +                std::lock_guard<std::mutex> lock(_mutex); +                try { +                    _recv_pkt->refresh(buff->cast<void*>()); +                    const ctrl_payload payload = _recv_pkt->get_payload(); +                    if (_endpoint_map.find(payload.dst_port) != _endpoint_map.end()) { +                        _endpoint_map.at(payload.dst_port)->handle_recv(payload); +                    } +                } catch (...) { +                    // Ignore all errors +                } +            } else { +                // Be a good citizen and yield if no packet is processed +                static const size_t MIN_DUR = 1; +                boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR)); +                // We call sleep(MIN_DUR) above instead of yield() to ensure that we +                // relinquish the current scheduler time slot. +                // yield() is a hint to the scheduler to end the time +                // slice early and schedule in another thread that is ready to run. +                // However in most situations, there will be no other thread and +                // this thread will continue to run which will rail a CPU core. +                // We call sleep(MIN_DUR=1) instead which will sleep for a minimum +                // time. Ideally we would like to use boost::chrono::.*seconds::min() +                // but that is bound to 0, which causes the sleep_for call to be a +                // no-op and thus useless to actually force a sleep. +            } +        } +    } + +    // The endpoint ID of the destination +    const sep_id_t _dst_epid; +    // The endpoint ID of this software endpoint +    const sep_id_t _my_epid; +    // Send/recv transports +    const uhd::both_xports_t _xports; +    // The curent sequence number for a send packet +    size_t _send_seqnum = 0; +    // The number of packets dropped +    size_t _num_drops = 0; +    // Packet containers +    chdr_ctrl_packet::uptr _send_pkt; +    chdr_ctrl_packet::cuptr _recv_pkt; +    // A collection of ctrlport endpoints (keyed by the port number) +    std::map<uint16_t, ctrlport_endpoint::sptr> _endpoint_map; +    // A thread that will handle all responses and async message requests +    std::atomic_bool _stop_recv_thread; +    std::thread _recv_thread; +    // Mutex that protects all state in this class +    std::mutex _mutex; +}; + +chdr_ctrl_endpoint::uptr chdr_ctrl_endpoint::make(const both_xports_t& xports, +    const chdr::chdr_packet_factory& pkt_factory, +    sep_id_t dst_epid, +    sep_id_t my_epid) +{ +    return std::make_unique<chdr_ctrl_endpoint_impl>( +        xports, pkt_factory, dst_epid, my_epid); +} diff --git a/host/lib/rfnoc/ctrlport_endpoint.cpp b/host/lib/rfnoc/ctrlport_endpoint.cpp new file mode 100644 index 000000000..ade5b89ee --- /dev/null +++ b/host/lib/rfnoc/ctrlport_endpoint.cpp @@ -0,0 +1,471 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/exception.hpp> +#include <uhd/utils/log.hpp> +#include <uhdlib/rfnoc/chdr/chdr_packet.hpp> +#include <uhdlib/rfnoc/chdr/chdr_types.hpp> +#include <uhdlib/rfnoc/ctrlport_endpoint.hpp> +#include <condition_variable> +#include <boost/format.hpp> +#include <mutex> +#include <queue> + + +using namespace uhd; +using namespace uhd::rfnoc; +using namespace uhd::rfnoc::chdr; + +using namespace std::chrono; +using namespace std::chrono_literals; + +namespace { +//! Max async msg (CTRL_WRITE) size in 32-bit words (2 hdr, 2 TS, 1 op-word, 1 data) +constexpr size_t ASYNC_MESSAGE_SIZE = 6; +//! Default completion timeout for transactions +constexpr double DEFAULT_TIMEOUT = 0.1; +//! Default value for whether ACKs are always required +constexpr bool DEFAULT_FORCE_ACKS = false; +} + +ctrlport_endpoint::~ctrlport_endpoint() = default; + +class ctrlport_endpoint_impl : public ctrlport_endpoint +{ +public: +    ctrlport_endpoint_impl(const send_fn_t& send_fcn, +        sep_id_t my_epid, +        uint16_t local_port, +        size_t buff_capacity, +        size_t max_outstanding_async_msgs, +        double ctrl_clk_freq, +        double timebase_freq) +        : _handle_send(send_fcn) +        , _my_epid(my_epid) +        , _local_port(local_port) +        , _buff_capacity(buff_capacity) +        , _max_outstanding_async_msgs(max_outstanding_async_msgs) +        , _ctrl_clk_freq(ctrl_clk_freq) +        , _timebase_freq(timebase_freq) +    { +    } + +    virtual ~ctrlport_endpoint_impl() = default; + +    virtual void poke32(uint32_t addr, +        uint32_t data, +        uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, +        bool ack                   = false) +    { +        // Compute transaction expiration time +        auto timeout_time = start_timeout(_policy.timeout); +        // Send request +        auto request = +            send_request_packet(OP_WRITE, addr, {data}, timestamp, timeout_time); +        // Optionally wait for an ACK +        if (ack || _policy.force_acks) { +            wait_for_ack(request, timeout_time); +        } +    } + +    virtual void multi_poke32(const std::vector<uint32_t> addrs, +        const std::vector<uint32_t> data, +        uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, +        bool ack                   = false) +    { +        if (addrs.size() != data.size()) { +            throw uhd::value_error("addrs and data vectors must be of the same length"); +        } +        for (size_t i = 0; i < data.size(); i++) { +            poke32(addrs[i], +                data[i], +                (i == 0) ? timestamp : uhd::time_spec_t::ASAP, +                (i == data.size() - 1) ? ack : false); +        } +    } + +    virtual void block_poke32(uint32_t first_addr, +        const std::vector<uint32_t> data, +        uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, +        bool ack                   = false) +    { +        for (size_t i = 0; i < data.size(); i++) { +            poke32(first_addr + (i * sizeof(uint32_t)), +                data[i], +                (i == 0) ? timestamp : uhd::time_spec_t::ASAP, +                (i == data.size() - 1) ? ack : false); +        } + +        /* TODO: Uncomment when the atomic block poke is implemented in the FPGA +        // Compute transaction expiration time +        auto timeout_time = start_timeout(_policy.timeout); +        // Send request +        auto request = send_request_packet( +            OP_BLOCK_WRITE, first_addr, data, timestamp, timeout_time); +        // Optionally wait for an ACK +        if (ack || _policy.force_acks) { +            wait_for_ack(request, timeout_time); +        } +        */ +    } + +    virtual uint32_t peek32( +        uint32_t addr, uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) +    { +        // Compute transaction expiration time +        auto timeout_time = start_timeout(_policy.timeout); +        // Send request +        auto request = +            send_request_packet(OP_READ, addr, {uint32_t(0)}, timestamp, timeout_time); +        // Wait for an ACK +        auto response = wait_for_ack(request, timeout_time); +        return response.data_vtr[0]; +    } + +    virtual std::vector<uint32_t> block_peek32(uint32_t first_addr, +        size_t length, +        uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP) +    { +        std::vector<uint32_t> values; +        for (size_t i = 0; i < length; i++) { +            values.push_back(peek32(first_addr + (i * sizeof(uint32_t)), +                (i == 0) ? timestamp : uhd::time_spec_t::ASAP)); +        } +        return values; + +        /* TODO: Uncomment when the atomic block peek is implemented in the FPGA +        // Compute transaction expiration time +        auto timeout_time = start_timeout(_policy.timeout); +        // Send request +        auto request = send_request_packet(OP_READ, +            first_addr, +            std::vector<uint32_t>(length, 0), +            timestamp, +            timeout_time); +        // Wait for an ACK +        auto response = wait_for_ack(request, timeout_time); +        return response.data_vtr; +        */ +    } + +    virtual void poll32(uint32_t addr, +        uint32_t data, +        uint32_t mask, +        uhd::time_spec_t timeout, +        uhd::time_spec_t timestamp = uhd::time_spec_t::ASAP, +        bool ack                   = false) +    { +        // TODO: Uncomment when this is implemented in the FPGA +        throw uhd::not_implemented_error("Control poll not implemented in the FPGA"); + +        // Compute transaction expiration time +        auto timeout_time = start_timeout(_policy.timeout); +        // Send request +        auto request = send_request_packet(OP_POLL, +            addr, +            {data, mask, static_cast<uint32_t>(timeout.get_real_secs() * _ctrl_clk_freq)}, +            timestamp, +            timeout_time); +        // Optionally wait for an ACK +        if (ack || _policy.force_acks) { +            wait_for_ack(request, timeout_time); +        } +    } + +    virtual void sleep(uhd::time_spec_t duration, bool ack = false) +    { +        // Compute transaction expiration time +        auto timeout_time = start_timeout(_policy.timeout); +        // Send request +        auto request = send_request_packet(OP_SLEEP, +            0, +            {static_cast<uint32_t>(duration.get_real_secs() * _ctrl_clk_freq)}, +            uhd::time_spec_t::ASAP, +            timeout_time); +        // Optionally wait for an ACK +        if (ack || _policy.force_acks) { +            wait_for_ack(request, timeout_time); +        } +    } + +    virtual void register_async_msg_handler(async_msg_callback_t callback_f) +    { +        _handle_async_msg = callback_f; +    } + +    virtual void set_policy(const std::string& name, const uhd::device_addr_t& args) +    { +        if (name == "default") { +            _policy.timeout    = args.cast<double>("timeout", DEFAULT_TIMEOUT); +            _policy.force_acks = DEFAULT_FORCE_ACKS; +        } else { +            // TODO: Uncomment when custom policies are implemented +            throw uhd::not_implemented_error("Policy implemented in the FPGA"); +        } +    } + +    virtual void handle_recv(const ctrl_payload& rx_ctrl) +    { +        std::unique_lock<std::mutex> lock(_mutex); +        if (rx_ctrl.is_ack) { +            // Function to process a response with no sequence errors +            auto process_correct_response = [this, rx_ctrl]() { +                response_status_t resp_status = RESP_VALID; +                // Grant flow control credits +                _buff_occupied -= get_payload_size(_req_queue.front()); +                _buff_free_cond.notify_one(); +                if (get_payload_size(_req_queue.front()) != get_payload_size(rx_ctrl)) { +                    resp_status = RESP_SIZEERR; +                } +                // Pop the request from the queue +                _req_queue.pop(); +                // Push the response into the response queue +                _resp_queue.push(std::make_tuple(rx_ctrl, resp_status)); +                _resp_ready_cond.notify_one(); +            }; +            // Function to process a response with sequence errors +            auto process_incorrect_response = [this]() { +                // Grant flow control credits +                _buff_occupied -= get_payload_size(_req_queue.front()); +                _buff_free_cond.notify_one(); +                // Push a fabricated response into the response queue +                ctrl_payload resp(_req_queue.front()); +                resp.is_ack = true; +                _resp_queue.push(std::make_tuple(resp, RESP_DROPPED)); +                _resp_ready_cond.notify_one(); +                // Pop the request from the queue +                _req_queue.pop(); +            }; + +            // Peek at the request queue to check the expected sequence number +            int8_t seq_num_diff = int8_t(rx_ctrl.seq_num - _req_queue.front().seq_num); +            if (seq_num_diff == 0) { // No sequence error +                process_correct_response(); +            } else if (seq_num_diff > 0) { // Packet(s) dropped +                // Tag all dropped packets +                for (int8_t i = 0; i < seq_num_diff; i++) { +                    process_incorrect_response(); +                } +                // Process correct response +                process_correct_response(); +            } else { // Reordered packet(s) +                // Requests are processed in order. If seq_num_diff is negative then we +                // have either already seen this response or we have dropped >128 +                // responses. Either way ignore this packet. +            } +        } else { +            // Handle asynchronous message callback +            ctrl_status_t status = CMD_CMDERR; +            if (rx_ctrl.op_code != OP_WRITE) { +                UHD_LOG_ERROR( +                    "CTRLEP", "Malformed async message request: Invalid opcode"); +            } else if (rx_ctrl.dst_port != _local_port) { +                UHD_LOG_ERROR("CTRLEP", "Malformed async message request: Invalid port"); +            } else if (rx_ctrl.data_vtr.empty()) { +                UHD_LOG_ERROR( +                    "CTRLEP", "Malformed async message request: Invalid num_data"); +            } else { +                try { +                    _handle_async_msg(rx_ctrl.address, rx_ctrl.data_vtr[0]); +                    status = CMD_OKAY; +                } catch (...) { +                    UHD_LOG_ERROR("CTRLEP", "Async message handler threw an exception"); +                } +            } +            try { +                // Respond with an ACK packet +                // Flow control not needed because we have allocated special room in the +                // buffer for async message responses +                ctrl_payload tx_ctrl(rx_ctrl); +                tx_ctrl.is_ack   = true; +                tx_ctrl.src_epid = _my_epid; +                tx_ctrl.status   = status; +                _handle_send(tx_ctrl, _policy.timeout); +            } catch (...) { +                UHD_LOG_ERROR("CTRLEP", +                    "Encountered an error sending a response for an async message"); +            } +        } +    } + +private: +    //! Returns the length of the control payload in 32-bit words +    inline static size_t get_payload_size(const ctrl_payload& payload) +    { +        return 2 + (payload.timestamp.is_initialized() ? 2 : 0) + payload.data_vtr.size(); +    } + +    //! Marks the start of a timeout for an operation and returns the expiration time +    inline const steady_clock::time_point start_timeout(double duration) +    { +        return steady_clock::now() + (static_cast<int>(std::ceil(duration / 1e-6)) * 1us); +    } + +    //! Sends a request control packet to a remote device +    const ctrl_payload send_request_packet(ctrl_opcode_t op_code, +        uint32_t address, +        const std::vector<uint32_t>& data_vtr, +        const uhd::time_spec_t& time_spec, +        const steady_clock::time_point& timeout_time) +    { +        std::unique_lock<std::mutex> lock(_mutex); + +        // Convert from uhd::time_spec to timestamp +        boost::optional<uint64_t> timestamp; +        if (time_spec != time_spec_t::ASAP) { +            timestamp = time_spec.to_ticks(_timebase_freq); +        } + +        // Assemble the control payload +        ctrl_payload tx_ctrl; +        tx_ctrl.dst_port    = _local_port; +        tx_ctrl.src_port    = _local_port; +        tx_ctrl.seq_num     = _tx_seq_num; +        tx_ctrl.timestamp   = timestamp; +        tx_ctrl.is_ack      = false; +        tx_ctrl.src_epid    = _my_epid; +        tx_ctrl.address     = address; +        tx_ctrl.data_vtr    = data_vtr; +        tx_ctrl.byte_enable = 0xF; +        tx_ctrl.op_code     = op_code; +        tx_ctrl.status      = CMD_OKAY; + +        // Perform flow control +        // If there is no room in the downstream buffer, then wait until the timeout +        size_t pyld_size   = get_payload_size(tx_ctrl); +        auto buff_not_full = [this, pyld_size]() -> bool { +            // Allocate room in the queue for one async response packet +            // If we can fit the current request in the queue then we can proceed +            return (_buff_occupied + pyld_size) +                   <= (_buff_capacity +                          - (ASYNC_MESSAGE_SIZE * _max_outstanding_async_msgs)); +        }; +        if (!buff_not_full()) { +            if (not _buff_free_cond.wait_until(lock, timeout_time, buff_not_full)) { +                throw uhd::op_timeout( +                    "Control operation timed out waiting for space in command buffer"); +            } +        } +        _buff_occupied += pyld_size; +        _req_queue.push(tx_ctrl); + +        // Send the payload as soon as there is room in the buffer +        _handle_send(tx_ctrl, _policy.timeout); +        _tx_seq_num = (_tx_seq_num + 1) % 64; + +        return tx_ctrl; +    } + +    //! Waits for and returns the ACK for the specified request +    const ctrl_payload wait_for_ack( +        const ctrl_payload& request, const steady_clock::time_point& timeout_time) +    { +        std::unique_lock<std::mutex> lock(_mutex); + +        auto resp_ready = [this]() -> bool { return !_resp_queue.empty(); }; +        while (true) { +            // Wait until there is a response in the response queue +            if (!resp_ready()) { +                if (not _resp_ready_cond.wait_until(lock, timeout_time, resp_ready)) { +                    throw uhd::op_timeout("Control operation timed out waiting for ACK"); +                } +            } +            // Extract the response packet +            ctrl_payload rx_ctrl; +            response_status_t resp_status; +            std::tie(rx_ctrl, resp_status) = _resp_queue.front(); +            _resp_queue.pop(); +            // Check if this is the response meant for the request +            // Filter by op_code, address and seq_num +            if (rx_ctrl.seq_num == request.seq_num && rx_ctrl.op_code == request.op_code +                && rx_ctrl.address == request.address) { +                // Validate transaction status +                if (rx_ctrl.status == CMD_CMDERR) { +                    throw uhd::op_failed("Control operation returned a failing status"); +                } else if (rx_ctrl.status == CMD_TSERR) { +                    throw uhd::op_timerr("Control operation returned a timestamp error"); +                } +                // Check data vector size +                if (rx_ctrl.data_vtr.size() == 0) { +                    throw uhd::op_failed( +                        "Control operation returned a malformed response"); +                } +                // Validate response status +                if (resp_status == RESP_DROPPED) { +                    throw uhd::op_seqerr( +                        "Response for a control transaction was dropped"); +                } else if (resp_status == RESP_RTERR) { +                    throw uhd::op_timerr("Control operation encountered a routing error"); +                } +                return rx_ctrl; +            } else { +                // This response does not belong to the request we passed in. Move on. +                continue; +            } +        } +    } + + +    //! The parameters associated with the policy that governs this object +    struct policy_args +    { +        double timeout  = DEFAULT_TIMEOUT; +        bool force_acks = DEFAULT_FORCE_ACKS; +    }; +    //! The software status (different from the transaction status) of the response +    enum response_status_t { RESP_VALID, RESP_DROPPED, RESP_RTERR, RESP_SIZEERR }; + +    //! Function to call to send a control packet +    const send_fn_t _handle_send; +    //! The endpoint ID of this software endpoint +    const sep_id_t _my_epid; +    //! The local port number on the control crossbar for this ctrlport endpoint +    const uint16_t _local_port; +    //! The downstream buffer capacity in 32-bit words (used for flow control) +    const size_t _buff_capacity; +    //! The max number of outstanding async messages that a block can have at any time +    const size_t _max_outstanding_async_msgs; +    //! The clock rate of the clock that drives the ctrlport endpoint +    const double _ctrl_clk_freq; +    //! The clock rate of the primary timebase used for timed commands +    const double _timebase_freq; + +    //! The function to call to handle an async message +    async_msg_callback_t _handle_async_msg = [](uint32_t, uint32_t) {}; +    //! The current control sequence number of outgoing packets +    uint8_t _tx_seq_num = 0; +    //! The number of occupied words in the downstream buffer +    ssize_t _buff_occupied = 0; +    //! The arguments for the policy that governs this register interface +    policy_args _policy; +    //! A condition variable that hold the "downstream buffer is free" condition +    std::condition_variable _buff_free_cond; +    //! A queue that holds all outstanding requests +    std::queue<ctrl_payload> _req_queue; +    //! A queue that holds all outstanding responses and their status +    std::queue<std::tuple<ctrl_payload, response_status_t>> _resp_queue; +    //! A condition variable that hold the "response is available" condition +    std::condition_variable _resp_ready_cond; +    //! A mutex to protect all state in this class +    std::mutex _mutex; +}; + +ctrlport_endpoint::sptr ctrlport_endpoint::make(const send_fn_t& handle_send, +    sep_id_t this_epid, +    uint16_t local_port, +    size_t buff_capacity, +    size_t max_outstanding_async_msgs, +    double ctrl_clk_freq, +    double timebase_freq) +{ +    return std::make_shared<ctrlport_endpoint_impl>(handle_send, +        this_epid, +        local_port, +        buff_capacity, +        max_outstanding_async_msgs, +        ctrl_clk_freq, +        timebase_freq); +} | 
