diff options
Diffstat (limited to 'host/lib/rfnoc')
-rw-r--r-- | host/lib/rfnoc/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/rfnoc/actions.cpp | 20 | ||||
-rw-r--r-- | host/lib/rfnoc/radio_control_impl.cpp | 16 | ||||
-rw-r--r-- | host/lib/rfnoc/rfnoc_tx_streamer.cpp | 69 | ||||
-rw-r--r-- | host/lib/rfnoc/tx_async_msg_queue.cpp | 52 |
5 files changed, 148 insertions, 10 deletions
diff --git a/host/lib/rfnoc/CMakeLists.txt b/host/lib/rfnoc/CMakeLists.txt index a88507dcd..73de394e3 100644 --- a/host/lib/rfnoc/CMakeLists.txt +++ b/host/lib/rfnoc/CMakeLists.txt @@ -55,6 +55,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/wb_iface_adapter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_rx_streamer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_tx_streamer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/tx_async_msg_queue.cpp # Default block control classes: ${CMAKE_CURRENT_SOURCE_DIR}/block_control.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ddc_block_control.cpp diff --git a/host/lib/rfnoc/actions.cpp b/host/lib/rfnoc/actions.cpp index d4ed4559f..4de05b304 100644 --- a/host/lib/rfnoc/actions.cpp +++ b/host/lib/rfnoc/actions.cpp @@ -63,3 +63,23 @@ rx_event_action_info::sptr rx_event_action_info::make() }; return std::make_shared<rx_event_action_info_make_shared>(); } + +/*** TX Metadata Action Info *************************************************/ +tx_event_action_info::tx_event_action_info( + uhd::async_metadata_t::event_code_t event_code_) + : action_info(ACTION_KEY_TX_EVENT), event_code(event_code_) +{ +} + +tx_event_action_info::sptr tx_event_action_info::make( + uhd::async_metadata_t::event_code_t event_code) +{ + struct tx_event_action_info_make_shared : public tx_event_action_info + { + tx_event_action_info_make_shared(uhd::async_metadata_t::event_code_t event_code) + : tx_event_action_info(event_code) + { + } + }; + return std::make_shared<tx_event_action_info_make_shared>(event_code); +} diff --git a/host/lib/rfnoc/radio_control_impl.cpp b/host/lib/rfnoc/radio_control_impl.cpp index 2094f4096..9d7257108 100644 --- a/host/lib/rfnoc/radio_control_impl.cpp +++ b/host/lib/rfnoc/radio_control_impl.cpp @@ -928,12 +928,24 @@ void radio_control_impl::async_message_handler( return; } switch (code) { - case err_codes::ERR_TX_UNDERRUN: + case err_codes::ERR_TX_UNDERRUN: { + auto tx_event_action = tx_event_action_info::make( + uhd::async_metadata_t::EVENT_CODE_UNDERFLOW); + post_action(res_source_info{res_source_info::INPUT_EDGE, chan}, + tx_event_action); UHD_LOG_FASTPATH("U"); + RFNOC_LOG_TRACE("Posting underrun event action message."); break; - case err_codes::ERR_TX_LATE_DATA: + } + case err_codes::ERR_TX_LATE_DATA: { + auto tx_event_action = tx_event_action_info::make( + uhd::async_metadata_t::EVENT_CODE_TIME_ERROR); + post_action(res_source_info{res_source_info::INPUT_EDGE, chan}, + tx_event_action); UHD_LOG_FASTPATH("L"); + RFNOC_LOG_TRACE("Posting late data event action message."); break; + } } break; } diff --git a/host/lib/rfnoc/rfnoc_tx_streamer.cpp b/host/lib/rfnoc/rfnoc_tx_streamer.cpp index 61d714a85..4fc1a3ff8 100644 --- a/host/lib/rfnoc/rfnoc_tx_streamer.cpp +++ b/host/lib/rfnoc/rfnoc_tx_streamer.cpp @@ -14,6 +14,7 @@ using namespace uhd::rfnoc; const std::string STREAMER_ID = "TxStreamer"; static std::atomic<uint64_t> streamer_inst_ctr; +static constexpr size_t ASYNC_MSG_QUEUE_SIZE = 1000; rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, const uhd::stream_args_t stream_args) @@ -21,10 +22,23 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, , _unique_id(STREAMER_ID + "#" + std::to_string(streamer_inst_ctr++)) , _stream_args(stream_args) { + _async_msg_queue = std::make_shared<tx_async_msg_queue>(ASYNC_MSG_QUEUE_SIZE); + // No block to which to forward properties or actions set_prop_forwarding_policy(forwarding_policy_t::DROP); set_action_forwarding_policy(forwarding_policy_t::DROP); + register_action_handler(ACTION_KEY_TX_EVENT, + [this](const res_source_info& src, action_info::sptr action) { + tx_event_action_info::sptr tx_event_action = + std::dynamic_pointer_cast<tx_event_action_info>(action); + if (!tx_event_action) { + RFNOC_LOG_WARNING("Received invalid TX event action!"); + return; + } + _handle_tx_event_action(src, tx_event_action); + }); + // Initialize properties _scaling_out.reserve(num_chans); _samp_rate_out.reserve(num_chans); @@ -41,7 +55,6 @@ rfnoc_tx_streamer::rfnoc_tx_streamer(const size_t num_chans, for (auto& mtu_prop : _mtu_out) { mtu_resolver_out.insert(&mtu_prop); } - //property_t<size_t>* mtu_out = &_mtu_out.back(); add_property_resolver({&_mtu_out[i]}, std::move(mtu_resolver_out), [&mtu_out = _mtu_out[i], i, this]() { @@ -105,10 +118,32 @@ void rfnoc_tx_streamer::connect_channel( const size_t mtu = xport->get_max_payload_size(); set_property<size_t>(PROP_KEY_MTU, mtu, {res_source_info::OUTPUT_EDGE, channel}); + xport->set_enqueue_async_msg_fn( + [this, channel](async_metadata_t::event_code_t event_code, bool has_tsf, uint64_t tsf) { + async_metadata_t md; + md.channel = channel; + md.event_code = event_code; + md.has_time_spec = has_tsf; + + if (has_tsf) { + md.time_spec = time_spec_t::from_ticks(tsf, get_tick_rate()); + } + + this->_async_msg_queue->enqueue(md); + }); + tx_streamer_impl<chdr_tx_data_xport>::connect_channel(channel, std::move(xport)); } -void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& otw_format) +bool rfnoc_tx_streamer::recv_async_msg(uhd::async_metadata_t& async_metadata, + double timeout) +{ + const auto timeout_ms = static_cast<uint64_t>(timeout * 1000); + return _async_msg_queue->recv_async_msg(async_metadata, timeout_ms); +} + +void rfnoc_tx_streamer::_register_props(const size_t chan, + const std::string& otw_format) { // Create actual properties and store them _scaling_out.push_back(property_t<double>( @@ -137,27 +172,45 @@ void rfnoc_tx_streamer::_register_props(const size_t chan, const std::string& ot register_property(mtu_out); // Add resolvers - add_property_resolver({scaling_out}, {}, - [&scaling_out = *scaling_out, chan, this]() { + add_property_resolver( + {scaling_out}, {}, [& scaling_out = *scaling_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `scaling_out'@" << chan); if (scaling_out.is_valid()) { this->set_scale_factor(chan, 32767.0 / scaling_out.get()); } }); - add_property_resolver({samp_rate_out}, {}, - [&samp_rate_out = *samp_rate_out, chan, this]() { + add_property_resolver( + {samp_rate_out}, {}, [& samp_rate_out = *samp_rate_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `samp_rate_out'@" << chan); if (samp_rate_out.is_valid()) { this->set_samp_rate(samp_rate_out.get()); } }); - add_property_resolver({tick_rate_out}, {}, - [&tick_rate_out = *tick_rate_out, chan, this]() { + add_property_resolver( + {tick_rate_out}, {}, [& tick_rate_out = *tick_rate_out, chan, this]() { RFNOC_LOG_TRACE("Calling resolver for `tick_rate_out'@" << chan); if (tick_rate_out.is_valid()) { this->set_tick_rate(tick_rate_out.get()); } }); } + +void rfnoc_tx_streamer::_handle_tx_event_action( + const res_source_info& src, tx_event_action_info::sptr tx_event_action) +{ + UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE); + + uhd::async_metadata_t md; + md.event_code = tx_event_action->event_code; + md.channel = src.instance; + md.has_time_spec = tx_event_action->has_tsf; + + if (md.has_time_spec) { + md.time_spec = time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate()); + } + + RFNOC_LOG_TRACE("Pushing metadata onto tx async msg queue, channel " << md.channel); + _async_msg_queue->enqueue(md); +} diff --git a/host/lib/rfnoc/tx_async_msg_queue.cpp b/host/lib/rfnoc/tx_async_msg_queue.cpp new file mode 100644 index 000000000..71a05074f --- /dev/null +++ b/host/lib/rfnoc/tx_async_msg_queue.cpp @@ -0,0 +1,52 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhdlib/rfnoc/tx_async_msg_queue.hpp> +#include <chrono> +#include <thread> + +using namespace uhd; +using namespace uhd::rfnoc; + +tx_async_msg_queue::tx_async_msg_queue(size_t capacity) + : _queue(capacity) +{ +} + +bool tx_async_msg_queue::recv_async_msg(uhd::async_metadata_t& async_metadata, + int32_t timeout_ms) +{ + using namespace std::chrono; + + if (timeout_ms == 0.0) { + return _queue.pop(async_metadata); + } + + const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + + bool last_check = false; + + while (true) { + if (_queue.pop(async_metadata)) { + return true; + } + + if (steady_clock::now() > end_time) { + if (last_check) { + return false; + } else { + last_check = true; + } + } + + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } +} + +void tx_async_msg_queue::enqueue(const async_metadata_t& async_metadata) +{ + _queue.push(async_metadata); +} |