diff options
Diffstat (limited to 'host/lib/rfnoc/async_msg_handler.cpp')
-rw-r--r-- | host/lib/rfnoc/async_msg_handler.cpp | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/host/lib/rfnoc/async_msg_handler.cpp b/host/lib/rfnoc/async_msg_handler.cpp new file mode 100644 index 000000000..b412eec9d --- /dev/null +++ b/host/lib/rfnoc/async_msg_handler.cpp @@ -0,0 +1,190 @@ +// +// Copyright 2018 Ettus Research, a National Instruments Company +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <uhd/exception.hpp> +#include <uhd/utils/tasks.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/utils/log.hpp> +#include <uhd/transport/chdr.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhdlib/rfnoc/async_msg_handler.hpp> +#include <boost/make_shared.hpp> +#include <mutex> + +using namespace uhd; +using namespace uhd::rfnoc; + +template <endianness_t _endianness> +class async_msg_handler_impl : public async_msg_handler +{ +public: + /************************************************************************ + * Types + ***********************************************************************/ + typedef uhd::transport::bounded_buffer<async_msg_t> async_md_type; + + /************************************************************************ + * Structors + ***********************************************************************/ + async_msg_handler_impl( + uhd::transport::zero_copy_if::sptr recv, + uhd::transport::zero_copy_if::sptr send, + uhd::sid_t sid + ) : _rx_xport(recv), + _tx_xport(send), + _sid(sid) + { + // Launch receive thread + _recv_msg_task = task::make([=](){ + this->handle_async_msgs(); + } + ); + } + + ~async_msg_handler_impl() {} + + /************************************************************************ + * API calls + ***********************************************************************/ + int register_event_handler( + const async_msg_t::event_code_t event_code, + async_handler_type handler + ) { + _event_handlers.insert(std::pair<async_msg_t::event_code_t, async_handler_type>(event_code, handler)); + return _event_handlers.count(event_code); + } + + void post_async_msg( + const async_msg_t &metadata + ) { + std::lock_guard<std::mutex> lock(_mutex); + + for (auto const event_handler : _event_handlers) { + // If the event code in the message matches the event code used at + // registration time, call the event handler + if ((metadata.event_code & event_handler.first) + == event_handler.first) { + event_handler.second(metadata); + } + } + + // Print + if (metadata.event_code & async_msg_t::EVENT_CODE_UNDERFLOW) { + UHD_LOG_FASTPATH("U") + } else if (metadata.event_code & + ( async_msg_t::EVENT_CODE_SEQ_ERROR + | async_msg_t::EVENT_CODE_SEQ_ERROR_IN_BURST) + ) { + UHD_LOG_FASTPATH("S") + } else if (metadata.event_code & + ( async_msg_t::EVENT_CODE_LATE_CMD_ERROR + | async_msg_t::EVENT_CODE_LATE_DATA_ERROR) + ) { + UHD_LOG_FASTPATH("L") + } else if (metadata.event_code & async_msg_t::EVENT_CODE_OVERRUN) { + UHD_LOG_FASTPATH("O") + } + } + +private: // methods + /************************************************************************ + * Internals + ***********************************************************************/ + /*! Packet receiver thread call. + */ + void handle_async_msgs( ) + { + using namespace uhd::transport; + managed_recv_buffer::sptr buff = _rx_xport->get_recv_buff(); + if (not buff) + return; + + // Get packet info + vrt::if_packet_info_t if_packet_info; + if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); + const uint32_t *packet_buff = buff->cast<const uint32_t *>(); + + //unpacking can fail + uint32_t (*endian_conv)(uint32_t) = uhd::ntohx; + try { + if (_endianness == ENDIANNESS_BIG) { + vrt::chdr::if_hdr_unpack_be(packet_buff, if_packet_info); + endian_conv = uhd::ntohx; + } else { + vrt::chdr::if_hdr_unpack_le(packet_buff, if_packet_info); + endian_conv = uhd::wtohx; + } + } + catch (const uhd::value_error &ex) { + UHD_LOGGER_ERROR("RFNOC") << "[async message handler] Error parsing async message packet: " << ex.what() << std::endl; + return; + } + + // We discard anything that's not actually a command or response packet. + if (not (if_packet_info.packet_type & vrt::if_packet_info_t::PACKET_TYPE_CMD) + or if_packet_info.num_packet_words32 == 0) { + return; + } + + const uint32_t *payload = packet_buff + if_packet_info.num_header_words32; + async_msg_t metadata(if_packet_info.num_payload_words32 - 1); + metadata.has_time_spec = if_packet_info.has_tsf; + // FIXME: not hardcoding tick rate + metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, 1); + metadata.event_code = async_msg_t::event_code_t( + endian_conv(payload[0]) & 0xFFFF + ); + metadata.sid = if_packet_info.sid; + + //load user payload + for (size_t i = 1; i < if_packet_info.num_payload_words32; i++) { + metadata.payload[i-1] = endian_conv(payload[i]); + } + + this->post_async_msg(metadata); + } + + uint32_t get_local_addr() const + { + return _sid.get_src(); + } + +private: // members + + std::mutex _mutex; + //! Store event handlers + std::multimap<async_msg_t::event_code_t, async_handler_type> _event_handlers; + //! port that receive messge + uhd::transport::zero_copy_if::sptr _rx_xport; + + //!port that send out respond + uhd::transport::zero_copy_if::sptr _tx_xport; + + //! The source part of \p _sid is the address of this async message handler. + uhd::sid_t _sid; + + //! Stores the task that polls the Rx queue + task::sptr _recv_msg_task; +}; + +async_msg_handler::sptr async_msg_handler::make( + uhd::transport::zero_copy_if::sptr recv, + uhd::transport::zero_copy_if::sptr send, + uhd::sid_t sid, + endianness_t endianness +) { + if (endianness == ENDIANNESS_BIG) { + return boost::make_shared< async_msg_handler_impl<ENDIANNESS_BIG> >( + recv, send, sid + ); + } else { + return boost::make_shared< async_msg_handler_impl<ENDIANNESS_LITTLE> >( + recv, send, sid + ); + } +} + |