diff options
Diffstat (limited to 'host/lib')
-rw-r--r-- | host/lib/rfnoc/replay_block_control.cpp | 72 |
1 files changed, 72 insertions, 0 deletions
diff --git a/host/lib/rfnoc/replay_block_control.cpp b/host/lib/rfnoc/replay_block_control.cpp index b28476ea8..2ee31de93 100644 --- a/host/lib/rfnoc/replay_block_control.cpp +++ b/host/lib/rfnoc/replay_block_control.cpp @@ -11,6 +11,7 @@ #include <uhd/rfnoc/property.hpp> #include <uhd/rfnoc/registry.hpp> #include <uhd/rfnoc/replay_block_control.hpp> +#include <uhd/transport/bounded_buffer.hpp> #include <uhd/types/stream_cmd.hpp> #include <uhd/utils/math.hpp> #include <uhdlib/utils/compat_check.hpp> @@ -67,6 +68,9 @@ const char* const PROP_KEY_PLAY_OFFSET = "play_offset"; const char* const PROP_KEY_PLAY_SIZE = "play_size"; const char* const PROP_KEY_PKT_SIZE = "packet_size"; +// Depth of the async message queues +constexpr size_t ASYNC_MSG_QUEUE_SIZE = 128; + class replay_block_control_impl : public replay_block_control { public: @@ -124,6 +128,26 @@ public: } issue_stream_cmd(stream_cmd_action->stream_cmd, port); }); + register_action_handler(ACTION_KEY_RX_EVENT, + [this](const res_source_info& src, action_info::sptr action) { + rx_event_action_info::sptr rx_event_action = + std::dynamic_pointer_cast<rx_event_action_info>(action); + if (!rx_event_action) { + RFNOC_LOG_WARNING("Received invalid RX event action!"); + return; + } + _handle_rx_event_action(src, rx_event_action); + }); + register_action_handler(ACTION_KEY_TX_EVENT, + [this](const res_source_info& src, action_info::sptr action) { + tx_event_action_info::sptr tx_event_action = + std::dynamic_pointer_cast<tx_event_action_info>(action); + if (!tx_event_action) { + RFNOC_LOG_WARNING("Received invalid TX event action!"); + return; + } + _handle_tx_event_action(src, tx_event_action); + }); // Initialize record properties _record_type.reserve(_num_input_ports); @@ -242,6 +266,12 @@ public: return uhd::convert::get_bytes_per_item(get_record_type(port)); } + bool get_record_async_metadata( + uhd::rx_metadata_t& metadata, const double timeout = 0.0) + { + return _record_msg_queue.pop_with_timed_wait(metadata, timeout); + } + /************************************************************************** * Playback State API **************************************************************************/ @@ -276,6 +306,11 @@ public: return uhd::convert::get_bytes_per_item(get_play_type(port)); } + bool get_play_async_metadata(uhd::async_metadata_t& metadata, const double timeout) + { + return _playback_msg_queue.pop_with_timed_wait(metadata, timeout); + } + /************************************************************************** * Advanced Record Control API calls *************************************************************************/ @@ -583,6 +618,37 @@ private: } } + void _handle_rx_event_action( + const res_source_info& src, rx_event_action_info::sptr rx_event_action) + { + UHD_ASSERT_THROW(src.type == res_source_info::INPUT_EDGE); + uhd::rx_metadata_t rx_md{}; + rx_md.error_code = rx_event_action->error_code; + RFNOC_LOG_DEBUG("Received RX error code on channel " + << src.instance << ", error code " << rx_md.strerror()); + _record_msg_queue.push_with_pop_on_full(rx_md); + } + + void _handle_tx_event_action( + const res_source_info& src, tx_event_action_info::sptr tx_event_action) + { + UHD_ASSERT_THROW(src.type == res_source_info::OUTPUT_EDGE); + + uhd::async_metadata_t md; + md.event_code = tx_event_action->event_code; + md.channel = src.instance; + md.has_time_spec = tx_event_action->has_tsf; + + if (md.has_time_spec) { + md.time_spec = + uhd::time_spec_t::from_ticks(tx_event_action->tsf, get_tick_rate()); + } + RFNOC_LOG_DEBUG("Received TX error code on channel " + << src.instance << ", error code " + << static_cast<int>(md.event_code)); + _playback_msg_queue.push_with_pop_on_full(md); + } + /************************************************************************** * Attributes *************************************************************************/ @@ -605,6 +671,12 @@ private: std::vector<property_t<uint32_t>> _packet_size; std::vector<property_t<size_t>> _atomic_item_size_in; std::vector<property_t<size_t>> _atomic_item_size_out; + + // Message queues for async data + uhd::transport::bounded_buffer<uhd::async_metadata_t> _playback_msg_queue{ + ASYNC_MSG_QUEUE_SIZE}; + uhd::transport::bounded_buffer<uhd::rx_metadata_t> _record_msg_queue{ + ASYNC_MSG_QUEUE_SIZE}; }; UHD_RFNOC_BLOCK_REGISTER_DIRECT( |