//
// Copyright 2018 Ettus Research, a National Instruments Company
//
// SPDX-License-Identifier: GPL-3.0-or-later
//

#include <uhd/exception.hpp>
#include <uhd/utils/tasks.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/log.hpp>
#include <uhd/transport/chdr.hpp>
#include <uhd/transport/zero_copy.hpp>
#include <uhd/transport/bounded_buffer.hpp>
#include <uhdlib/rfnoc/async_msg_handler.hpp>
#include <boost/make_shared.hpp>
#include <mutex>

using namespace uhd;
using namespace uhd::rfnoc;

template <endianness_t _endianness>
class async_msg_handler_impl : public async_msg_handler
{
public:
    /************************************************************************
     * Types
     ***********************************************************************/
    typedef uhd::transport::bounded_buffer<async_msg_t> async_md_type;

    /************************************************************************
     * Structors
     ***********************************************************************/
    async_msg_handler_impl(
            uhd::transport::zero_copy_if::sptr recv,
            uhd::transport::zero_copy_if::sptr send,
            uhd::sid_t sid
    ) : _rx_xport(recv),
        _tx_xport(send),
        _sid(sid)
    {
        // Launch receive thread
        _recv_msg_task = task::make([=](){
                this->handle_async_msgs();
            }
        );
    }

    ~async_msg_handler_impl() {}

    /************************************************************************
     * API calls
     ***********************************************************************/
    int register_event_handler(
            const async_msg_t::event_code_t event_code,
            async_handler_type handler
    ) {
        _event_handlers.insert(std::pair<async_msg_t::event_code_t, async_handler_type>(event_code, handler));
        return _event_handlers.count(event_code);
    }

    void post_async_msg(
            const async_msg_t &metadata
    ) {
        std::lock_guard<std::mutex> lock(_mutex);

        for (auto const event_handler : _event_handlers) {
            // If the event code in the message matches the event code used at
            // registration time, call the event handler
            if ((metadata.event_code & event_handler.first)
                    == event_handler.first) {
                event_handler.second(metadata);
            }
        }

        // Print
        if (metadata.event_code & async_msg_t::EVENT_CODE_UNDERFLOW) {
            UHD_LOG_FASTPATH("U")
        } else if (metadata.event_code &
            ( async_msg_t::EVENT_CODE_SEQ_ERROR
            | async_msg_t::EVENT_CODE_SEQ_ERROR_IN_BURST)
        ) {
            UHD_LOG_FASTPATH("S")
        } else if (metadata.event_code &
            ( async_msg_t::EVENT_CODE_LATE_CMD_ERROR
            | async_msg_t::EVENT_CODE_LATE_DATA_ERROR)
        ) {
            UHD_LOG_FASTPATH("L")
        } else if (metadata.event_code & async_msg_t::EVENT_CODE_OVERRUN) {
            UHD_LOG_FASTPATH("O")
        }
    }

private: // methods
    /************************************************************************
     * Internals
     ***********************************************************************/
    /*! Packet receiver thread call.
     */
    void handle_async_msgs(  )
    {
        using namespace uhd::transport;
        managed_recv_buffer::sptr buff = _rx_xport->get_recv_buff();
        if (not buff)
            return;

        // Get packet info
        vrt::if_packet_info_t if_packet_info;
        if_packet_info.num_packet_words32 = buff->size()/sizeof(uint32_t);
        const uint32_t *packet_buff = buff->cast<const uint32_t *>();

        //unpacking can fail
        uint32_t (*endian_conv)(uint32_t) = uhd::ntohx;
        try {
            if (_endianness == ENDIANNESS_BIG) {
                vrt::chdr::if_hdr_unpack_be(packet_buff, if_packet_info);
                endian_conv = uhd::ntohx;
            } else {
                vrt::chdr::if_hdr_unpack_le(packet_buff, if_packet_info);
                endian_conv = uhd::wtohx;
            }
        }
        catch (const uhd::value_error &ex) {
            UHD_LOGGER_ERROR("RFNOC") << "[async message handler] Error parsing async message packet: " << ex.what() << std::endl;
            return;
        }

        // We discard anything that's not actually a command or response packet.
        if (not (if_packet_info.packet_type & vrt::if_packet_info_t::PACKET_TYPE_CMD)
                or if_packet_info.num_packet_words32 == 0) {
            return;
        }

        const uint32_t *payload = packet_buff + if_packet_info.num_header_words32;
        async_msg_t metadata(if_packet_info.num_payload_words32 - 1);
        metadata.has_time_spec = if_packet_info.has_tsf;
        // FIXME: not hardcoding tick rate
        metadata.time_spec = time_spec_t::from_ticks(if_packet_info.tsf, 1);
        metadata.event_code = async_msg_t::event_code_t(
                endian_conv(payload[0]) & 0xFFFF
        );
        metadata.sid = if_packet_info.sid;

        //load user payload
        for (size_t i = 1; i < if_packet_info.num_payload_words32; i++) {
            metadata.payload[i-1] = endian_conv(payload[i]);
        }

        this->post_async_msg(metadata);
    }

    uint32_t get_local_addr() const
    {
        return _sid.get_src();
    }
    
private: // members

    std::mutex _mutex;
    //! Store event handlers
    std::multimap<async_msg_t::event_code_t, async_handler_type> _event_handlers;
    //! port that receive messge
    uhd::transport::zero_copy_if::sptr _rx_xport;

    //!port that send out respond
    uhd::transport::zero_copy_if::sptr _tx_xport;

    //! The source part of \p _sid is the address of this async message handler.
    uhd::sid_t _sid;

    //! Stores the task that polls the Rx queue
    task::sptr _recv_msg_task;
};

async_msg_handler::sptr async_msg_handler::make(
    uhd::transport::zero_copy_if::sptr recv,
    uhd::transport::zero_copy_if::sptr send,
    uhd::sid_t sid,
    endianness_t endianness
) {
    if (endianness == ENDIANNESS_BIG) {
        return boost::make_shared< async_msg_handler_impl<ENDIANNESS_BIG> >(
            recv, send, sid
        );
    } else {
        return boost::make_shared< async_msg_handler_impl<ENDIANNESS_LITTLE> >(
            recv, send, sid
        );
    }
}