From bffef674fbbcd892967017e81515bb76e0b850b5 Mon Sep 17 00:00:00 2001 From: Ciro Nishiguchi Date: Thu, 8 Aug 2019 10:25:20 -0500 Subject: rfnoc: tx_streamer: add support for async messages Add an async message queue that aggregates errors from multiple sources. Errors can come from the strs packets originating from the stream endpoint or from the radio block through control packets to the host. --- .../include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 41 ++++++++++++++++-- .../lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp | 17 ++++++++ .../include/uhdlib/rfnoc/tx_async_msg_queue.hpp | 49 ++++++++++++++++++++++ 3 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp (limited to 'host/lib/include/uhdlib/rfnoc') diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 0fb0ab5d1..63c2b24cb 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -7,6 +7,7 @@ #ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP #define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#include #include #include #include @@ -103,8 +104,9 @@ private: class chdr_tx_data_xport { public: - using uptr = std::unique_ptr; - using buff_t = transport::frame_buff; + using uptr = std::unique_ptr; + using buff_t = transport::frame_buff; + using enqueue_async_msg_fn_t = std::function; //! Information about data packet struct packet_info_t @@ -214,6 +216,16 @@ public: return _send_io->get_send_buff(timeout_ms); } + /*! + * Configure a function to call to enqueue async msgs + * + * \param fn Function to enqueue async messages + */ + void set_enqueue_async_msg_fn(enqueue_async_msg_fn_t fn) + { + _enqueue_async_msg = fn; + } + /*! * Sends a TX data packet * @@ -286,7 +298,27 @@ private: _fc_state.update_dest_recv_count( {strs.xfer_count_bytes, static_cast(strs.xfer_count_pkts)}); - // TODO: check strs status here and push into async msg queue + if (strs.status != chdr::STRS_OKAY) { + switch (strs.status) { + case chdr::STRS_SEQERR: + UHD_LOG_FASTPATH("S"); + if (_enqueue_async_msg) { + _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0); + } + break; + case chdr::STRS_DATAERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!"); + break; + case chdr::STRS_RTERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!"); + break; + case chdr::STRS_CMDERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!"); + break; + default: + break; + } + } // Packet belongs to this transport, release buff and return true recv_link->release_recv_buff(std::move(buff)); @@ -522,6 +554,9 @@ private: // Handles sending of strc flow control ack packets detail::tx_flow_ctrl_sender _fc_sender; + // Function to enqueue an async msg + enqueue_async_msg_fn_t _enqueue_async_msg; + // Local / Source EPID sep_id_t _epid; }; diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp index 3bfc9d05a..3e006f7f9 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -78,9 +79,25 @@ public: */ void connect_channel(const size_t channel, chdr_tx_data_xport::uptr xport); + /*! Receive an asynchronous message from this tx stream + * + * Implementation of tx_streamer API method. + * + * \param async_metadata the metadata to be filled in + * \param timeout the timeout in seconds to wait for a message + * \return true when the async_metadata is valid, false for timeout + */ + bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout); + private: void _register_props(const size_t chan, const std::string& otw_format); + void _handle_tx_event_action( + const res_source_info& src, tx_event_action_info::sptr tx_event_action); + + // Queue for async messages + tx_async_msg_queue::sptr _async_msg_queue; + // Properties std::vector> _scaling_out; std::vector> _samp_rate_out; diff --git a/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp new file mode 100644 index 000000000..181a31754 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp @@ -0,0 +1,49 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP +#define INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP + +#include +#include + +namespace uhd { namespace rfnoc { + +/*! + * Implements queue of async messages originating from the tx data transport + * and from the rfnoc graph. + */ +class tx_async_msg_queue +{ +public: + using sptr = std::shared_ptr; + + //! Constructor + tx_async_msg_queue(size_t capacity); + + /*! + * Retrieve async message from queue + * + * \param async_metadata the metadata to be filled in + * \param timeout_ms the timeout in milliseconds to wait for a message + * \return true when the async_metadata is valid, false for timeout + */ + bool recv_async_msg(async_metadata_t& async_metadata, int32_t timeout_ms); + + /*! + * Push an async message onto the queue + * + * \param async_metadata the metadata to be pushed + */ + void enqueue(const async_metadata_t& async_metadata); + +private: + boost::lockfree::queue _queue; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP */ -- cgit v1.2.3