// // Copyright 2018 Ettus Research, a National Instruments Company // // SPDX-License-Identifier: GPL-3.0-or-later // #include #include #include #include #include #include #include #include #include #include using namespace uhd; using namespace uhd::rfnoc; template class async_msg_handler_impl : public async_msg_handler { public: /************************************************************************ * Types ***********************************************************************/ typedef uhd::transport::bounded_buffer async_md_type; /************************************************************************ * Structors ***********************************************************************/ async_msg_handler_impl( uhd::transport::zero_copy_if::sptr recv, uhd::transport::zero_copy_if::sptr send, uhd::sid_t sid ) : _rx_xport(recv), _tx_xport(send), _sid(sid) { // Launch receive thread _recv_msg_task = task::make([=](){ this->handle_async_msgs(); } ); } ~async_msg_handler_impl() {} /************************************************************************ * API calls ***********************************************************************/ int register_event_handler( const async_msg_t::event_code_t event_code, async_handler_type handler ) { _event_handlers.insert(std::pair(event_code, handler)); return _event_handlers.count(event_code); } void post_async_msg( const async_msg_t &metadata ) { std::lock_guard lock(_mutex); for (auto const event_handler : _event_handlers) { // If the event code in the message matches the event code used at // registration time, call the event handler if ((metadata.event_code & event_handler.first) == event_handler.first) { event_handler.second(metadata); } } // Print if (metadata.event_code & async_msg_t::EVENT_CODE_UNDERFLOW) { UHD_LOG_FASTPATH("U") } else if (metadata.event_code & ( async_msg_t::EVENT_CODE_SEQ_ERROR | async_msg_t::EVENT_CODE_SEQ_ERROR_IN_BURST) ) { UHD_LOG_FASTPATH("S") } else if (metadata.event_code & ( async_msg_t::EVENT_CODE_LATE_CMD_ERROR | async_msg_t::EVENT_CODE_LATE_DATA_ERROR) ) { UHD_LOG_FASTPATH("L") } else if (metadata.event_code & async_msg_t::EVENT_CODE_OVERRUN) { UHD_LOG_FASTPATH("O") } } private: // methods /************************************************************************ * Internals ***********************************************************************/ /*! Packet receiver thread call. */ void handle_async_msgs( ) { using namespace uhd::transport; managed_recv_buffer::sptr buff = _rx_xport->get_recv_buff(); if (not buff) return; // Get packet info vrt::if_packet_info_t if_packet_info; if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t); const uint32_t *packet_buff = buff->cast(); //unpacking can fail uint32_t (*endian_conv)(uint32_t) = uhd::ntohx; try { if (_endianness == ENDIANNESS_BIG) { vrt::chdr::if_hdr_unpack_be(packet_buff, if_packet_info); endian_conv = uhd::ntohx; } else { vrt::chdr::if_hdr_unpack_le(packet_buff, if_packet_info); endian_conv = uhd::wtohx; } } catch (const uhd::value_error &ex) { UHD_LOGGER_ERROR("RFNOC") << "[async message handler] Error parsing async message packet: " << ex.what() << std::endl; return; } // We discard anything that's not actually a command or response packet. if (not (if_packet_info.packet_type & vrt::if_packet_info_t::PACKET_TYPE_CMD) or if_packet_info.num_packet_words32 == 0) { return; } const uint32_t *payload = packet_buff + if_packet_info.num_header_words32; async_msg_t metadata(if_packet_info.num_payload_words32 - 1); metadata.has_time_spec = if_packet_info.has_tsf; // FIXME: not hardcoding tick rate metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, 1); metadata.event_code = async_msg_t::event_code_t( endian_conv(payload[0]) & 0xFFFF ); metadata.sid = if_packet_info.sid; //load user payload for (size_t i = 1; i < if_packet_info.num_payload_words32; i++) { metadata.payload[i-1] = endian_conv(payload[i]); } this->post_async_msg(metadata); } uint32_t get_local_addr() const { return _sid.get_src(); } private: // members std::mutex _mutex; //! Store event handlers std::multimap _event_handlers; //! port that receive messge uhd::transport::zero_copy_if::sptr _rx_xport; //!port that send out respond uhd::transport::zero_copy_if::sptr _tx_xport; //! The source part of \p _sid is the address of this async message handler. uhd::sid_t _sid; //! Stores the task that polls the Rx queue task::sptr _recv_msg_task; }; async_msg_handler::sptr async_msg_handler::make( uhd::transport::zero_copy_if::sptr recv, uhd::transport::zero_copy_if::sptr send, uhd::sid_t sid, endianness_t endianness ) { if (endianness == ENDIANNESS_BIG) { return boost::make_shared< async_msg_handler_impl >( recv, send, sid ); } else { return boost::make_shared< async_msg_handler_impl >( recv, send, sid ); } }