diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-08-08 10:25:20 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:36 -0800 |
commit | bffef674fbbcd892967017e81515bb76e0b850b5 (patch) | |
tree | 8f56eb8548d0fb56094b555ae11d16eb61e6c381 /host/lib/include | |
parent | 91e01c484475600fcd659bb433ab86efa5146426 (diff) | |
download | uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.gz uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.bz2 uhd-bffef674fbbcd892967017e81515bb76e0b850b5.zip |
rfnoc: tx_streamer: add support for async messages
Add an async message queue that aggregates errors from multiple sources.
Errors can come from the strs packets originating from the stream
endpoint or from the radio block through control packets to the host.
Diffstat (limited to 'host/lib/include')
5 files changed, 114 insertions, 10 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 0fb0ab5d1..63c2b24cb 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -7,6 +7,7 @@ #ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP #define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#include <uhd/types/metadata.hpp> #include <uhdlib/rfnoc/chdr_packet.hpp> #include <uhdlib/rfnoc/chdr_types.hpp> #include <uhdlib/rfnoc/mgmt_portal.hpp> @@ -103,8 +104,9 @@ private: class chdr_tx_data_xport { public: - using uptr = std::unique_ptr<chdr_tx_data_xport>; - using buff_t = transport::frame_buff; + using uptr = std::unique_ptr<chdr_tx_data_xport>; + using buff_t = transport::frame_buff; + using enqueue_async_msg_fn_t = std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>; //! Information about data packet struct packet_info_t @@ -215,6 +217,16 @@ public: } /*! + * Configure a function to call to enqueue async msgs + * + * \param fn Function to enqueue async messages + */ + void set_enqueue_async_msg_fn(enqueue_async_msg_fn_t fn) + { + _enqueue_async_msg = fn; + } + + /*! * Sends a TX data packet * * \param buff the frame buffer containing the packet to send @@ -286,7 +298,27 @@ private: _fc_state.update_dest_recv_count( {strs.xfer_count_bytes, static_cast<uint32_t>(strs.xfer_count_pkts)}); - // TODO: check strs status here and push into async msg queue + if (strs.status != chdr::STRS_OKAY) { + switch (strs.status) { + case chdr::STRS_SEQERR: + UHD_LOG_FASTPATH("S"); + if (_enqueue_async_msg) { + _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0); + } + break; + case chdr::STRS_DATAERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!"); + break; + case chdr::STRS_RTERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!"); + break; + case chdr::STRS_CMDERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!"); + break; + default: + break; + } + } // Packet belongs to this transport, release buff and return true recv_link->release_recv_buff(std::move(buff)); @@ -522,6 +554,9 @@ private: // Handles sending of strc flow control ack packets detail::tx_flow_ctrl_sender _fc_sender; + // Function to enqueue an async msg + enqueue_async_msg_fn_t _enqueue_async_msg; + // Local / Source EPID sep_id_t _epid; }; diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp index 3bfc9d05a..3e006f7f9 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp @@ -9,6 +9,7 @@ #include <uhd/rfnoc/node.hpp> #include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/rfnoc/tx_async_msg_queue.hpp> #include <uhdlib/transport/tx_streamer_impl.hpp> #include <string> @@ -78,9 +79,25 @@ public: */ void connect_channel(const size_t channel, chdr_tx_data_xport::uptr xport); + /*! Receive an asynchronous message from this tx stream + * + * Implementation of tx_streamer API method. + * + * \param async_metadata the metadata to be filled in + * \param timeout the timeout in seconds to wait for a message + * \return true when the async_metadata is valid, false for timeout + */ + bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout); + private: void _register_props(const size_t chan, const std::string& otw_format); + void _handle_tx_event_action( + const res_source_info& src, tx_event_action_info::sptr tx_event_action); + + // Queue for async messages + tx_async_msg_queue::sptr _async_msg_queue; + // Properties std::vector<property_t<double>> _scaling_out; std::vector<property_t<double>> _samp_rate_out; diff --git a/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp new file mode 100644 index 000000000..181a31754 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp @@ -0,0 +1,49 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP +#define INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP + +#include <uhd/types/metadata.hpp> +#include <boost/lockfree/queue.hpp> + +namespace uhd { namespace rfnoc { + +/*! + * Implements queue of async messages originating from the tx data transport + * and from the rfnoc graph. + */ +class tx_async_msg_queue +{ +public: + using sptr = std::shared_ptr<tx_async_msg_queue>; + + //! Constructor + tx_async_msg_queue(size_t capacity); + + /*! + * Retrieve async message from queue + * + * \param async_metadata the metadata to be filled in + * \param timeout_ms the timeout in milliseconds to wait for a message + * \return true when the async_metadata is valid, false for timeout + */ + bool recv_async_msg(async_metadata_t& async_metadata, int32_t timeout_ms); + + /*! + * Push an async message onto the queue + * + * \param async_metadata the metadata to be pushed + */ + void enqueue(const async_metadata_t& async_metadata); + +private: + boost::lockfree::queue<async_metadata_t> _queue; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index 819ed5558..35a724fa9 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -99,7 +99,6 @@ public: return _spp; } - /*! Get width of each over-the-wire item component. For complex items, * returns the width of one component only (real or imaginary). */ @@ -178,15 +177,13 @@ public: } } - //! Implementation of rx_streamer API method - bool recv_async_msg( - uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) +protected: + //! Returns the tick rate for conversion of timestamp + double get_tick_rate() const { - // TODO: implement me - return false; + return _zero_copy_streamer.get_tick_rate(); } -protected: //! Returns the size in bytes of a sample in a packet size_t get_mtu() const { diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp index 1b6f55238..5ac7a1e8c 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -51,6 +51,12 @@ public: return _xports.size(); } + //! Returns the tick rate for conversion of timestamp + double get_tick_rate() const + { + return _tick_rate; + } + //! Configures tick rate for conversion of timestamp void set_tick_rate(const double rate) { |