diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-08-08 10:25:20 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:36 -0800 |
commit | bffef674fbbcd892967017e81515bb76e0b850b5 (patch) | |
tree | 8f56eb8548d0fb56094b555ae11d16eb61e6c381 /host/lib | |
parent | 91e01c484475600fcd659bb433ab86efa5146426 (diff) | |
download | uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.gz uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.bz2 uhd-bffef674fbbcd892967017e81515bb76e0b850b5.zip |
rfnoc: tx_streamer: add support for async messages
Add an async message queue that aggregates errors from multiple sources.
Errors can come from the strs packets originating from the stream
endpoint or from the radio block through control packets to the host.
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp | 41 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp | 17 | ||||
-rw-r--r-- | host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp | 49 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_impl.hpp | 11 | ||||
-rw-r--r-- | host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp | 6 | ||||
-rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/rfnoc/actions.cpp | 20 | ||||
-rw-r--r-- | host/lib/rfnoc/radio_control_impl.cpp | 16 | ||||
-rw-r--r-- | host/lib/rfnoc/rfnoc_tx_streamer.cpp | 69 | ||||
-rw-r--r-- | host/lib/rfnoc/tx_async_msg_queue.cpp | 52 |
10 files changed, 262 insertions, 20 deletions
diff --git a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp index 0fb0ab5d1..63c2b24cb 100644 --- a/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp +++ b/host/lib/include/uhdlib/rfnoc/chdr_tx_data_xport.hpp @@ -7,6 +7,7 @@ #ifndef INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP #define INCLUDED_LIBUHD_CHDR_TX_DATA_XPORT_HPP +#include <uhd/types/metadata.hpp> #include <uhdlib/rfnoc/chdr_packet.hpp> #include <uhdlib/rfnoc/chdr_types.hpp> #include <uhdlib/rfnoc/mgmt_portal.hpp> @@ -103,8 +104,9 @@ private: class chdr_tx_data_xport { public: - using uptr = std::unique_ptr<chdr_tx_data_xport>; - using buff_t = transport::frame_buff; + using uptr = std::unique_ptr<chdr_tx_data_xport>; + using buff_t = transport::frame_buff; + using enqueue_async_msg_fn_t = std::function<void(async_metadata_t::event_code_t, bool, uint64_t)>; //! Information about data packet struct packet_info_t @@ -215,6 +217,16 @@ public: } /*! + * Configure a function to call to enqueue async msgs + * + * \param fn Function to enqueue async messages + */ + void set_enqueue_async_msg_fn(enqueue_async_msg_fn_t fn) + { + _enqueue_async_msg = fn; + } + + /*! * Sends a TX data packet * * \param buff the frame buffer containing the packet to send @@ -286,7 +298,27 @@ private: _fc_state.update_dest_recv_count( {strs.xfer_count_bytes, static_cast<uint32_t>(strs.xfer_count_pkts)}); - // TODO: check strs status here and push into async msg queue + if (strs.status != chdr::STRS_OKAY) { + switch (strs.status) { + case chdr::STRS_SEQERR: + UHD_LOG_FASTPATH("S"); + if (_enqueue_async_msg) { + _enqueue_async_msg(async_metadata_t::EVENT_CODE_SEQ_ERROR, false, 0); + } + break; + case chdr::STRS_DATAERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received data error in tx stream!"); + break; + case chdr::STRS_RTERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received routing error in tx stream!"); + break; + case chdr::STRS_CMDERR: + UHD_LOG_WARNING("XPORT::TX_DATA_XPORT", "Received command error in tx stream!"); + break; + default: + break; + } + } // Packet belongs to this transport, release buff and return true recv_link->release_recv_buff(std::move(buff)); @@ -522,6 +554,9 @@ private: // Handles sending of strc flow control ack packets detail::tx_flow_ctrl_sender _fc_sender; + // Function to enqueue an async msg + enqueue_async_msg_fn_t _enqueue_async_msg; + // Local / Source EPID sep_id_t _epid; }; diff --git a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp index 3bfc9d05a..3e006f7f9 100644 --- a/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp +++ b/host/lib/include/uhdlib/rfnoc/rfnoc_tx_streamer.hpp @@ -9,6 +9,7 @@ #include <uhd/rfnoc/node.hpp> #include <uhdlib/rfnoc/chdr_tx_data_xport.hpp> +#include <uhdlib/rfnoc/tx_async_msg_queue.hpp> #include <uhdlib/transport/tx_streamer_impl.hpp> #include <string> @@ -78,9 +79,25 @@ public: */ void connect_channel(const size_t channel, chdr_tx_data_xport::uptr xport); + /*! Receive an asynchronous message from this tx stream + * + * Implementation of tx_streamer API method. + * + * \param async_metadata the metadata to be filled in + * \param timeout the timeout in seconds to wait for a message + * \return true when the async_metadata is valid, false for timeout + */ + bool recv_async_msg(uhd::async_metadata_t& async_metadata, double timeout); + private: void _register_props(const size_t chan, const std::string& otw_format); + void _handle_tx_event_action( + const res_source_info& src, tx_event_action_info::sptr tx_event_action); + + // Queue for async messages + tx_async_msg_queue::sptr _async_msg_queue; + // Properties std::vector<property_t<double>> _scaling_out; std::vector<property_t<double>> _samp_rate_out; diff --git a/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp new file mode 100644 index 000000000..181a31754 --- /dev/null +++ b/host/lib/include/uhdlib/rfnoc/tx_async_msg_queue.hpp @@ -0,0 +1,49 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP +#define INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP + +#include <uhd/types/metadata.hpp> +#include <boost/lockfree/queue.hpp> + +namespace uhd { namespace rfnoc { + +/*! + * Implements queue of async messages originating from the tx data transport + * and from the rfnoc graph. + */ +class tx_async_msg_queue +{ +public: + using sptr = std::shared_ptr<tx_async_msg_queue>; + + //! Constructor + tx_async_msg_queue(size_t capacity); + + /*! + * Retrieve async message from queue + * + * \param async_metadata the metadata to be filled in + * \param timeout_ms the timeout in milliseconds to wait for a message + * \return true when the async_metadata is valid, false for timeout + */ + bool recv_async_msg(async_metadata_t& async_metadata, int32_t timeout_ms); + + /*! + * Push an async message onto the queue + * + * \param async_metadata the metadata to be pushed + */ + void enqueue(const async_metadata_t& async_metadata); + +private: + boost::lockfree::queue<async_metadata_t> _queue; +}; + +}} // namespace uhd::rfnoc + +#endif /* INCLUDED_LIBUHD_TX_ASYNC_MSG_QUEUE_HPP */ diff --git a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp index 819ed5558..35a724fa9 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_impl.hpp @@ -99,7 +99,6 @@ public: return _spp; } - /*! Get width of each over-the-wire item component. For complex items, * returns the width of one component only (real or imaginary). */ @@ -178,15 +177,13 @@ public: } } - //! Implementation of rx_streamer API method - bool recv_async_msg( - uhd::async_metadata_t& /*async_metadata*/, double /*timeout = 0.1*/) +protected: + //! Returns the tick rate for conversion of timestamp + double get_tick_rate() const { - // TODO: implement me - return false; + return _zero_copy_streamer.get_tick_rate(); } -protected: //! Returns the size in bytes of a sample in a packet size_t get_mtu() const { diff --git a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp index 1b6f55238..5ac7a1e8c 100644 --- a/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp +++ b/host/lib/include/uhdlib/transport/tx_streamer_zero_copy.hpp @@ -51,6 +51,12 @@ public: return _xports.size(); } + //! Returns the tick rate for conversion of timestamp + double get_tick_rate() const + { + return _tick_rate; + } + //! Configures tick rate for conversion of timestamp void set_tick_rate(const double rate) { diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index a88507dcd..73de394e3 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -55,6 +55,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/tx_async_msg_queue.cpp # Default block control classes: ${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp index d4ed4559f..4de05b304 100644 --- a/host/lib/rfnoc/actions.cpp +++ b/host/lib/rfnoc/actions.cpp @@ -63,3 +63,23 @@ rx_event_action_info::sptr rx_event_action_info::make() }; return std::make_shared<rx_event_action_info_make_shared>(); } + +/*** TX Metadata Action Info *************************************************/ +tx_event_action_info::tx_event_action_info( + uhd::async_metadata_t::event_code_t event_code_) + : action_info(ACTION_KEY_TX_EVENT), event_code(event_code_) +{ +} + +tx_event_action_info::sptr tx_event_action_info::make( + uhd::async_metadata_t::event_code_t event_code) +{ + struct tx_event_action_info_make_shared : public tx_event_action_info + { + tx_event_action_info_make_shared(uhd::async_metadata_t::event_code_t event_code) + : tx_event_action_info(event_code) + { + } + }; + return std::make_shared<tx_event_action_info_make_shared>(event_code); +} diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp index 2094f4096..9d7257108 100644 --- a/host/lib/rfnoc/radio_control_impl.cpp +++ b/host/lib/rfnoc/radio_control_impl.cpp @@ -928,12 +928,24 @@ void radio_control_impl::async_message_handler( return; } switch (code) { - case err_codes::ERR_TX_UNDERRUN: + case err_codes::ERR_TX_UNDERRUN: { + auto tx_event_action = tx_event_action_info::make( + uhd::async_metadata_t::EVENT_CODE_UNDERFLOW); + post_action(res_source_info{res_source_info::INPUT_EDGE, chan}, + tx_event_action); UHD_LOG_FASTPATH("U"); + RFNOC_LOG_TRACE("Posting underrun event action message."); break; - case err_codes::ERR_TX_LATE_DATA: + } + case err_codes::ERR_TX_LATE_DATA: { + auto tx_event_action = tx_event_action_info::make( + uhd::async_metadata_t::EVENT_CODE_TIME_ERROR); + post_action(res_source_info{res_source_info::INPUT_EDGE, chan}, + tx_event_action); UHD_LOG_FASTPATH("L"); + RFNOC_LOG_TRACE("Posting late data event action message."); break; + } } break; } diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp index 61d714a85..4fc1a3ff8 100644 --- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp @@ -14,6 +14,7 @@ using namespace uhd::rfnoc; const std::string STREAMER_ID = "TxStreamer"; static std::atomic<uint64_t> streamer_inst_ctr; +static constexpr size_t ASYNC_MSG_QUEUE_SIZE = 1000; rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args) @@ -21,10 +22,23 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) , _stream_args(stream_args) { + _async_msg_queue = std::make_shared<tx_async_msg_queue>(ASYNC_MSG_QUEUE_SIZE); + // No block to which to forward properties or actions set_prop_forwarding_policy(forwarding_policy_t::DROP); set_action_forwarding_policy(forwarding_policy_t::DROP); + register_action_handler(ACTION_KEY_TX_EVENT, + [this](const res_source_info& src, action_info::sptr action) { + tx_event_action_info::sptr tx_event_action = + std::dynamic_pointer_cast<tx_event_action_info>(action); + if (!tx_event_action) { + RFNOC_LOG_WARNING("Received invalid TX event action!"); + return; + } + _handle_tx_event_action(src, tx_event_action); + }); + // Initialize properties _scaling_out.reserve(num_chans); _samp_rate_out.reserve(num_chans); @@ -41,7 +55,6 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, for (auto& mtu_prop : _mtu_out) { mtu_resolver_out.insert(&mtu_prop); } - //property_t<size_t>* mtu_out = &_mtu_out.back(); add_property_resolver({&_mtu_out[i]}, std::move(mtu_resolver_out), [&mtu_out = _mtu_out[i], i, this]() { @@ -105,10 +118,32 @@ void rfnoc_tx_streamer::connect_channel( const size_t mtu = xport->get_max_payload_size(); set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::OUTPUT_EDGE, channel}); + xport->set_enqueue_async_msg_fn( + [this, channel](async_metadata_t::event_code_t event_code, bool has_tsf, uint64_t tsf) { + async_metadata_t md; + md.channel = channel; + md.event_code = event_code; + md.has_time_spec = has_tsf; + + if (has_tsf) { + md.time_spec = time_spec_t::from_ticks(tsf, get_tick_rate()); + } + + this->_async_msg_queue->enqueue(md); + }); + tx_streamer_impl<chdr_tx_data_xport>::connect_channel(channel, std::move(xport)); } -void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& otw_format) +bool rfnoc_tx_streamer::recv_async_msg(uhd::async_metadata_t& async_metadata, + double timeout) +{ + const auto timeout_ms = static_cast<uint64_t>(timeout * 1000); + return _async_msg_queue->recv_async_msg(async_metadata, timeout_ms); +} + +void rfnoc_tx_streamer::_register_props(const size_t chan, + const std::string& otw_format) { // Create actual properties and store them _scaling_out.push_back(property_t<double>( @@ -137,27 +172,45 @@ void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& ot register_property(mtu_out); // Add resolvers - add_property_resolver({scaling_out}, {}, - [&scaling_out = *scaling_out, chan, this]() { + add_property_resolver( + {scaling_out}, {}, [& scaling_out = *scaling_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan); if (scaling_out.is_valid()) { this->set_scale_factor(chan, 32767.0 / scaling_out.get()); } }); - add_property_resolver({samp_rate_out}, {}, - [&samp_rate_out = *samp_rate_out, chan, this]() { + add_property_resolver( + {samp_rate_out}, {}, [& samp_rate_out = *samp_rate_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan); if (samp_rate_out.is_valid()) { this->set_samp_rate(samp_rate_out.get()); } }); - add_property_resolver({tick_rate_out}, {}, - [&tick_rate_out = *tick_rate_out, chan, this]() { + add_property_resolver( + {tick_rate_out}, {}, [& tick_rate_out = *tick_rate_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan); if (tick_rate_out.is_valid()) { this->set_tick_rate(tick_rate_out.get()); } }); } + +void rfnoc_tx_streamer::_handle_tx_event_action( + const res_source_info& src, tx_event_action_info::sptr tx_event_action) +{ + UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE); + + uhd::async_metadata_t md; + md.event_code = tx_event_action->event_code; + md.channel = src.instance; + md.has_time_spec = tx_event_action->has_tsf; + + if (md.has_time_spec) { + md.time_spec = time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate()); + } + + RFNOC_LOG_TRACE("Pushing metadata onto tx async msg queue, channel " << md.channel); + _async_msg_queue->enqueue(md); +} diff --git a/host/lib/rfnoc/tx_async_msg_queue.cpp b/host/lib/rfnoc/tx_async_msg_queue.cpp new file mode 100644 index 000000000..71a05074f --- /dev/null +++ b/host/lib/rfnoc/tx_async_msg_queue.cpp @@ -0,0 +1,52 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhdlib/rfnoc/tx_async_msg_queue.hpp> +#include <chrono> +#include <thread> + +using namespace uhd; +using namespace uhd::rfnoc; + +tx_async_msg_queue::tx_async_msg_queue(size_t capacity) + : _queue(capacity) +{ +} + +bool tx_async_msg_queue::recv_async_msg(uhd::async_metadata_t& async_metadata, + int32_t timeout_ms) +{ + using namespace std::chrono; + + if (timeout_ms == 0.0) { + return _queue.pop(async_metadata); + } + + const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + + bool last_check = false; + + while (true) { + if (_queue.pop(async_metadata)) { + return true; + } + + if (steady_clock::now() > end_time) { + if (last_check) { + return false; + } else { + last_check = true; + } + } + + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } +} + +void tx_async_msg_queue::enqueue(const async_metadata_t& async_metadata) +{ + _queue.push(async_metadata); +} |