diff options
author | Ciro Nishiguchi <ciro.nishiguchi@ni.com> | 2019-08-08 10:25:20 -0500 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-11-26 11:49:36 -0800 |
commit | bffef674fbbcd892967017e81515bb76e0b850b5 (patch) | |
tree | 8f56eb8548d0fb56094b555ae11d16eb61e6c381 /host/lib/rfnoc | |
parent | 91e01c484475600fcd659bb433ab86efa5146426 (diff) | |
download | uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.gz uhd-bffef674fbbcd892967017e81515bb76e0b850b5.tar.bz2 uhd-bffef674fbbcd892967017e81515bb76e0b850b5.zip |
rfnoc: tx_streamer: add support for async messages
Add an async message queue that aggregates errors from multiple sources.
Errors can come from the strs packets originating from the stream
endpoint or from the radio block through control packets to the host.
Diffstat (limited to 'host/lib/rfnoc')
-rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/rfnoc/actions.cpp | 20 | ||||
-rw-r--r-- | host/lib/rfnoc/radio_control_impl.cpp | 16 | ||||
-rw-r--r-- | host/lib/rfnoc/rfnoc_tx_streamer.cpp | 69 | ||||
-rw-r--r-- | host/lib/rfnoc/tx_async_msg_queue.cpp | 52 |
5 files changed, 148 insertions, 10 deletions
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index a88507dcd..73de394e3 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -55,6 +55,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/tx_async_msg_queue.cpp # Default block control classes: ${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp index d4ed4559f..4de05b304 100644 --- a/host/lib/rfnoc/actions.cpp +++ b/host/lib/rfnoc/actions.cpp @@ -63,3 +63,23 @@ rx_event_action_info::sptr rx_event_action_info::make() }; return std::make_shared<rx_event_action_info_make_shared>(); } + +/*** TX Metadata Action Info *************************************************/ +tx_event_action_info::tx_event_action_info( + uhd::async_metadata_t::event_code_t event_code_) + : action_info(ACTION_KEY_TX_EVENT), event_code(event_code_) +{ +} + +tx_event_action_info::sptr tx_event_action_info::make( + uhd::async_metadata_t::event_code_t event_code) +{ + struct tx_event_action_info_make_shared : public tx_event_action_info + { + tx_event_action_info_make_shared(uhd::async_metadata_t::event_code_t event_code) + : tx_event_action_info(event_code) + { + } + }; + return std::make_shared<tx_event_action_info_make_shared>(event_code); +} diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp index 2094f4096..9d7257108 100644 --- a/host/lib/rfnoc/radio_control_impl.cpp +++ b/host/lib/rfnoc/radio_control_impl.cpp @@ -928,12 +928,24 @@ void radio_control_impl::async_message_handler( return; } switch (code) { - case err_codes::ERR_TX_UNDERRUN: + case err_codes::ERR_TX_UNDERRUN: { + auto tx_event_action = tx_event_action_info::make( + uhd::async_metadata_t::EVENT_CODE_UNDERFLOW); + post_action(res_source_info{res_source_info::INPUT_EDGE, chan}, + tx_event_action); UHD_LOG_FASTPATH("U"); + RFNOC_LOG_TRACE("Posting underrun event action message."); break; - case err_codes::ERR_TX_LATE_DATA: + } + case err_codes::ERR_TX_LATE_DATA: { + auto tx_event_action = tx_event_action_info::make( + uhd::async_metadata_t::EVENT_CODE_TIME_ERROR); + post_action(res_source_info{res_source_info::INPUT_EDGE, chan}, + tx_event_action); UHD_LOG_FASTPATH("L"); + RFNOC_LOG_TRACE("Posting late data event action message."); break; + } } break; } diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp index 61d714a85..4fc1a3ff8 100644 --- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp @@ -14,6 +14,7 @@ using namespace uhd::rfnoc; const std::string STREAMER_ID = "TxStreamer"; static std::atomic<uint64_t> streamer_inst_ctr; +static constexpr size_t ASYNC_MSG_QUEUE_SIZE = 1000; rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args) @@ -21,10 +22,23 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) , _stream_args(stream_args) { + _async_msg_queue = std::make_shared<tx_async_msg_queue>(ASYNC_MSG_QUEUE_SIZE); + // No block to which to forward properties or actions set_prop_forwarding_policy(forwarding_policy_t::DROP); set_action_forwarding_policy(forwarding_policy_t::DROP); + register_action_handler(ACTION_KEY_TX_EVENT, + [this](const res_source_info& src, action_info::sptr action) { + tx_event_action_info::sptr tx_event_action = + std::dynamic_pointer_cast<tx_event_action_info>(action); + if (!tx_event_action) { + RFNOC_LOG_WARNING("Received invalid TX event action!"); + return; + } + _handle_tx_event_action(src, tx_event_action); + }); + // Initialize properties _scaling_out.reserve(num_chans); _samp_rate_out.reserve(num_chans); @@ -41,7 +55,6 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, for (auto& mtu_prop : _mtu_out) { mtu_resolver_out.insert(&mtu_prop); } - //property_t<size_t>* mtu_out = &_mtu_out.back(); add_property_resolver({&_mtu_out[i]}, std::move(mtu_resolver_out), [&mtu_out = _mtu_out[i], i, this]() { @@ -105,10 +118,32 @@ void rfnoc_tx_streamer::connect_channel( const size_t mtu = xport->get_max_payload_size(); set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::OUTPUT_EDGE, channel}); + xport->set_enqueue_async_msg_fn( + [this, channel](async_metadata_t::event_code_t event_code, bool has_tsf, uint64_t tsf) { + async_metadata_t md; + md.channel = channel; + md.event_code = event_code; + md.has_time_spec = has_tsf; + + if (has_tsf) { + md.time_spec = time_spec_t::from_ticks(tsf, get_tick_rate()); + } + + this->_async_msg_queue->enqueue(md); + }); + tx_streamer_impl<chdr_tx_data_xport>::connect_channel(channel, std::move(xport)); } -void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& otw_format) +bool rfnoc_tx_streamer::recv_async_msg(uhd::async_metadata_t& async_metadata, + double timeout) +{ + const auto timeout_ms = static_cast<uint64_t>(timeout * 1000); + return _async_msg_queue->recv_async_msg(async_metadata, timeout_ms); +} + +void rfnoc_tx_streamer::_register_props(const size_t chan, + const std::string& otw_format) { // Create actual properties and store them _scaling_out.push_back(property_t<double>( @@ -137,27 +172,45 @@ void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& ot register_property(mtu_out); // Add resolvers - add_property_resolver({scaling_out}, {}, - [&scaling_out = *scaling_out, chan, this]() { + add_property_resolver( + {scaling_out}, {}, [& scaling_out = *scaling_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan); if (scaling_out.is_valid()) { this->set_scale_factor(chan, 32767.0 / scaling_out.get()); } }); - add_property_resolver({samp_rate_out}, {}, - [&samp_rate_out = *samp_rate_out, chan, this]() { + add_property_resolver( + {samp_rate_out}, {}, [& samp_rate_out = *samp_rate_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan); if (samp_rate_out.is_valid()) { this->set_samp_rate(samp_rate_out.get()); } }); - add_property_resolver({tick_rate_out}, {}, - [&tick_rate_out = *tick_rate_out, chan, this]() { + add_property_resolver( + {tick_rate_out}, {}, [& tick_rate_out = *tick_rate_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan); if (tick_rate_out.is_valid()) { this->set_tick_rate(tick_rate_out.get()); } }); } + +void rfnoc_tx_streamer::_handle_tx_event_action( + const res_source_info& src, tx_event_action_info::sptr tx_event_action) +{ + UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE); + + uhd::async_metadata_t md; + md.event_code = tx_event_action->event_code; + md.channel = src.instance; + md.has_time_spec = tx_event_action->has_tsf; + + if (md.has_time_spec) { + md.time_spec = time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate()); + } + + RFNOC_LOG_TRACE("Pushing metadata onto tx async msg queue, channel " << md.channel); + _async_msg_queue->enqueue(md); +} diff --git a/host/lib/rfnoc/tx_async_msg_queue.cpp b/host/lib/rfnoc/tx_async_msg_queue.cpp new file mode 100644 index 000000000..71a05074f --- /dev/null +++ b/host/lib/rfnoc/tx_async_msg_queue.cpp @@ -0,0 +1,52 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhdlib/rfnoc/tx_async_msg_queue.hpp> +#include <chrono> +#include <thread> + +using namespace uhd; +using namespace uhd::rfnoc; + +tx_async_msg_queue::tx_async_msg_queue(size_t capacity) + : _queue(capacity) +{ +} + +bool tx_async_msg_queue::recv_async_msg(uhd::async_metadata_t& async_metadata, + int32_t timeout_ms) +{ + using namespace std::chrono; + + if (timeout_ms == 0.0) { + return _queue.pop(async_metadata); + } + + const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + + bool last_check = false; + + while (true) { + if (_queue.pop(async_metadata)) { + return true; + } + + if (steady_clock::now() > end_time) { + if (last_check) { + return false; + } else { + last_check = true; + } + } + + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } +} + +void tx_async_msg_queue::enqueue(const async_metadata_t& async_metadata) +{ + _queue.push(async_metadata); +} |