/*
   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
   Research Center Canada)

   Copyright (C) 2018
   Matthias P. Braendli, matthias.braendli@mpb.li

    http://www.opendigitalradio.org

   A server that serves state information and statistics for
   monitoring purposes, and also serves the internal configuration
   property tree.

   This statistics server is very easy to integrate with munin
        http://munin-monitoring.org/
   but is not specific to it.

   The responds in JSON, and accepts the commands:
    - config
    - values
      Inspired by the munin equivalent, returns the configuration
      and the statistics values for every exported stat.

    - getptree
      Returns the internal boost property_tree that contains the
      multiplexer configuration DB.

   The server is using REQ/REP ZeroMQ sockets.
   */
/*
   This file is part of ODR-DabMux.

   ODR-DabMux is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as
   published by the Free Software Foundation, either version 3 of the
   License, or (at your option) any later version.

   ODR-DabMux is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>.
   */

#pragma once

#ifdef HAVE_CONFIG_H
#   include "config.h"
#endif

#include "zmq.hpp"
#include <string>
#include <map>
#include <atomic>
#include <chrono>
#include <deque>
#include <thread>
#include <mutex>

// Suppress an deprecation warning from boost
#define BOOST_BIND_GLOBAL_PLACEHOLDERS

#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <cmath>

/*** State handing ***/
/* An input can be in one of the following three states:
 */
enum class input_state_t
{
    /* The input is waiting for data, all buffers are empty */
    NoData,
    /* The input is running, but has seen many underruns or overruns recently */
    Unstable,
    /* The input is running, but the audio level is too low, or has
     * been too low recently
     */
    Silence,
    /* The input is running stable */
    Streaming
};

/* InputStat takes care of
 * - saving the statistics for graphing
 * - calculating the state of the input for monitoring
 */
class InputStat
{
    public:
        InputStat(const std::string& name);
        InputStat(const InputStat& other) = delete;
        InputStat& operator=(const InputStat& other) = delete;
        ~InputStat();
        void registerAtServer(void);

        std::string get_name(void) const { return m_name; }

        /* This function is called for every frame read by
         * the multiplexer */
        void notifyBuffer(long bufsize);
        void notifyTimestampOffset(double offset);
        void notifyPeakLevels(int peak_left, int peak_right);
        void notifyUnderrun(void);
        void notifyOverrun(void);
        void notifyVersion(const std::string& version, uint32_t uptime_s);
        std::string encodeValuesJSON(void);
        input_state_t determineState(void);

    private:
        std::string m_name;

        // Remove all expired fill and peak stats
        void prune_statistics(const std::chrono::time_point<std::chrono::steady_clock>& timestamp);

        /************ STATISTICS ***********/
        // Keep track of buffer fill with timestamps, so that we
        // can calculate the correct state from it.
        struct fill_stat_t {
            std::chrono::time_point<std::chrono::steady_clock> timestamp;
            long bufsize;
        };
        std::deque<fill_stat_t> m_buffer_fill_stats;

        // counter of number of overruns and underruns since startup
        uint32_t m_num_underruns = 0;
        uint32_t m_num_overruns = 0;

        // last measured timestamp offset
        double m_last_tist_offset = 0;

        // Peak audio levels (linear 16-bit PCM) for the two channels.
        // Keep a FIFO of values from the last minutes, apply
        // a short window to also see short-term fluctuations.
        struct peak_stat_t {
            std::chrono::time_point<std::chrono::steady_clock> timestamp;
            int peak_left;
            int peak_right;
        };
        std::deque<peak_stat_t> m_peak_stats;

        size_t m_short_window_length = 0;

        std::string m_version;
        uint32_t m_uptime_s = 0;

        /************* STATE ***************/
        /* Variables used for determining the input state */
        int m_glitch_counter = 0; // saturating counter
        int m_silence_counter = 0; // saturating counter
        std::chrono::time_point<std::chrono::steady_clock> m_time_last_event;

        // The mutex that has to be held during all notify and readout
        mutable std::mutex m_mutex;
};

class ManagementServer
{
    public:
        ManagementServer();
        ManagementServer(const ManagementServer& other) = delete;
        ManagementServer& operator=(const ManagementServer& other) = delete;
        ~ManagementServer();

        void open(int listenport);

        /* Un-/Register a statistics data source */
        void registerInput(InputStat* is);
        void unregisterInput(std::string id);

        /* Load a ptree given by the management server.
         *
         * Returns true if the ptree was updated
         */
        bool retrieve_new_ptree(boost::property_tree::ptree& pt);

        /* Update the copy of the configuration property tree and notify the
         * update to the internal server thread.
         */
        void update_ptree(const boost::property_tree::ptree& pt);

        bool fault_detected() const { return m_fault; }
        void restart(void);

    private:
        void restart_thread(long);

        /******* Server ******/
        zmq::context_t m_zmq_context;
        zmq::socket_t  m_zmq_sock;

        void serverThread(void);
        void handle_message(zmq::message_t& zmq_message);

        bool isInputRegistered(std::string& id);

        int m_listenport = 0;

        // serverThread runs in a separate thread
        std::atomic<bool> m_running;
        std::atomic<bool> m_fault;
        std::thread m_thread;
        std::thread m_restarter_thread;

        /******* Statistics Data ********/
        std::map<std::string, InputStat*> m_inputStats;

        /* Return a description of the configuration that will
         * allow to define what graphs to be created
         *
         * returns: a JSON encoded configuration
         */
        std::string getStatConfigJSON();

        /* Return the values for the statistics as defined in the configuration
         *
         * returns: JSON encoded statistics
         */
        std::string getValuesJSON();

        // mutex for accessing the map
        std::mutex m_statsmutex;

        /******** Configuration Data *******/
        std::mutex m_configmutex;
        boost::property_tree::ptree m_pt;
};

// If necessary construct the management server singleton and return
// a reference to it
ManagementServer& get_mgmt_server();