/* Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org A server that serves state information and statistics for monitoring purposes, and also serves the internal configuration property tree. This statistics server is very easy to integrate with munin http://munin-monitoring.org/ but is not specific to it. The responds in JSON, and accepts the commands: - config - values Inspired by the munin equivalent - state Returns the state of each input - getptree Returns the internal boost property_tree that contains the multiplexer configuration DB. The server is using REQ/REP ZeroMQ sockets. */ /* This file is part of ODR-DabMux. ODR-DabMux is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ODR-DabMux is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. */ #ifndef __MANAGEMENT_SERVER_H #define __MANAGEMENT_SERVER_H #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "zmq.hpp" #include <string> #include <map> #include <atomic> #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/json_parser.hpp> #include <ctime> #include <math.h> #define MIN_FILL_BUFFER_UNDEF (-1) /*** State handing ***/ /* An input can be in one of the following three states: */ enum input_state_t { /* The input is waiting for data, all buffers are empty */ NoData, /* The input is running, but has seen many underruns or overruns recently */ Unstable, /* The input is running, but the audio level is too low, or has * been too low recently */ Silence, /* The input is running stable */ Streaming }; /* The delay after which the glitch counter is reset */ #define INPUT_COUNTER_RESET_TIME 30 // minutes /* How many glitches we tolerate in Streaming state before * we consider the input Unstable */ #define INPUT_UNSTABLE_THRESHOLD 3 /* For how long the input buffers must be empty before we move an input to the * NoData state. */ #define INPUT_NODATA_TIMEOUT 30 // seconds /* For silence detection, we count the number of occurrences the audio level * falls below a threshold. * * The counter is decreased for each frame that has good audio level. * * The counter saturates, and this value defines for how long the * input will be considered silent after a cut. * * If the count reaches a certain value, the input changes state * to Silence. */ #define INPUT_AUDIO_LEVEL_THRESHOLD -50 // dB #define INPUT_AUDIO_LEVEL_SILENCE_COUNT 100 // superframes (120ms) #define INPUT_AUDIO_LEVEL_COUNT_SATURATION 500 // superframes (120ms) /* An example of how the state changes work. * The timeout is set to expire in 30 minutes * at each under-/overrun. * * The glitch counter is increased by one for each glitch (can be a * saturating counter), and set to zero when the counter timeout expires. * * The state is then simply depending on the glitch counter value. * * Graphical example: state STREAMING | UNSTABLE | STREAMING xruns U U U glitch counter 0 1 2 3 0 reset timeout \ |\ |\ |\ \ | \ | \ | \ \ | \ | \ | \ \| \ | \| \ ` \| ` \ ` \ \ \ \ \ timeout expires ___________________\ <--30min--> */ /* InputStat takes care of * - saving the statistics for graphing * - calculating the state of the input for monitoring */ class InputStat { public: InputStat(std::string name) : m_name(name) { /* Statistics */ num_underruns = 0; num_overruns = 0; /* State handling */ time_t now = time(NULL); m_time_last_event = now; m_time_last_buffer_nonempty = 0; m_buffer_empty = true; m_glitch_counter = 0; m_silence_counter = 0; reset(); } void registerAtServer(void); ~InputStat(); // Gets called each time the statistics are transmitted, // and resets the counters to zero void reset(void) { min_fill_buffer = MIN_FILL_BUFFER_UNDEF; max_fill_buffer = 0; peak_left = 0; peak_right = 0; } std::string& get_name(void) { return m_name; } /* This function is called for every frame read by * the multiplexer */ void notifyBuffer(long bufsize) { boost::mutex::scoped_lock lock(m_mutex); // Statistics if (bufsize > max_fill_buffer) { max_fill_buffer = bufsize; } if (bufsize < min_fill_buffer || min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { min_fill_buffer = bufsize; } // State m_buffer_empty = (bufsize == 0); if (!m_buffer_empty) { m_time_last_buffer_nonempty = time(NULL); } } void notifyPeakLevels(int peak_left, int peak_right) { boost::mutex::scoped_lock lock(m_mutex); // Statistics if (peak_left > this->peak_left) { this->peak_left = peak_left; } if (peak_right > this->peak_right) { this->peak_right = peak_right; } // State // using the smallest of the two channels // allows us to detect if only one channel // is silent. int minpeak = peak_left < peak_right ? peak_left : peak_right; const int16_t int16_max = std::numeric_limits<int16_t>::max(); int peak_dB = minpeak ? round(20*log10((double)minpeak / int16_max)) : -90; if (peak_dB < INPUT_AUDIO_LEVEL_THRESHOLD) { if (m_silence_counter < INPUT_AUDIO_LEVEL_COUNT_SATURATION) { m_silence_counter++; } } else { if (m_silence_counter > 0) { m_silence_counter--; } } } void notifyUnderrun(void) { boost::mutex::scoped_lock lock(m_mutex); // Statistics num_underruns++; // State m_time_last_event = time(NULL); if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { m_glitch_counter++; } } void notifyOverrun(void) { boost::mutex::scoped_lock lock(m_mutex); // Statistics num_overruns++; // State m_time_last_event = time(NULL); if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { m_glitch_counter++; } } std::string encodeValuesJSON(void); std::string encodeStateJSON(void); input_state_t determineState(void); private: std::string m_name; /************ STATISTICS ***********/ // minimum and maximum buffer fill since last reset long min_fill_buffer; long max_fill_buffer; // counter of number of overruns and underruns since startup uint32_t num_underruns; uint32_t num_overruns; // peak audio levels (linear 16-bit PCM) for the two channels int peak_left; int peak_right; /************* STATE ***************/ /* Variables used for determining the input state */ int m_glitch_counter; // saturating counter int m_silence_counter; // saturating counter time_t m_time_last_event; time_t m_time_last_buffer_nonempty; bool m_buffer_empty; // The mutex that has to be held during all notify and readout mutable boost::mutex m_mutex; }; class ManagementServer { public: ManagementServer() : m_zmq_context(), m_zmq_sock(m_zmq_context, ZMQ_REP), m_running(false), m_fault(false), m_pending(false) { } ~ManagementServer() { m_running = false; m_fault = false; m_pending = false; // TODO notify m_thread.join(); } void open(int listenport) { m_listenport = listenport; if (m_listenport > 0) { m_thread = boost::thread(&ManagementServer::serverThread, this); } } /* Un-/Register a statistics data source */ void registerInput(InputStat* is); void unregisterInput(std::string id); /* Ask if there is a configuration request pending */ bool request_pending() { return m_pending; } /* Load a ptree given by the management server. * * Returns true if the ptree was updated */ bool retrieve_new_ptree(boost::property_tree::ptree& pt); /* Update the copy of the configuration property tree and notify the * update to the internal server thread. */ void update_ptree(const boost::property_tree::ptree& pt); bool fault_detected() { return m_fault; } void restart(void); private: void restart_thread(long); /******* Server ******/ zmq::context_t m_zmq_context; zmq::socket_t m_zmq_sock; // no copying (because of the thread) ManagementServer(const ManagementServer& other); void serverThread(void); void handle_message(zmq::message_t& zmq_message); bool handle_setptree(zmq::message_t& zmq_message, std::stringstream& answer); bool isInputRegistered(std::string& id); int m_listenport; // serverThread runs in a separate thread std::atomic<bool> m_running; std::atomic<bool> m_fault; boost::thread m_thread; boost::thread m_restarter_thread; /******* Statistics Data ********/ std::map<std::string, InputStat*> m_inputStats; /* Return a description of the configuration that will * allow to define what graphs to be created * * returns: a JSON encoded configuration */ std::string getStatConfigJSON(); /* Return the values for the statistics as defined in the configuration * * returns: JSON encoded statistics */ std::string getValuesJSON(); /* Return the state of each input * * returns: JSON encoded state */ std::string getStateJSON(); // mutex for accessing the map mutable boost::mutex m_statsmutex; /******** Configuration Data *******/ bool m_pending; bool m_retrieve_pending; boost::condition_variable m_condition; mutable boost::mutex m_configmutex; boost::property_tree::ptree m_pt; }; // If necessary construct the management server singleton and return // a reference to it ManagementServer& get_mgmt_server(); #endif