/* Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org A TCP Socket server that serves state information and statistics for monitoring purposes, and also serves the internal configuration property tree. This statistics server is very easy to integrate with munin http://munin-monitoring.org/ but is not specific to it. The TCP Server responds in JSON, and accepts the commands: - config - values Inspired by the munin equivalent - state Returns the state of each input - getptree Returns the internal boost property_tree that contains the multiplexer configuration DB. */ /* This file is part of ODR-DabMux. ODR-DabMux is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ODR-DabMux is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ODR-DabMux. If not, see . */ #ifndef __MANAGEMENT_SERVER_H #define __MANAGEMENT_SERVER_H #ifdef HAVE_CONFIG_H # include "config.h" #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #define MIN_FILL_BUFFER_UNDEF (-1) /*** State handing ***/ /* An input can be in one of the following three states: */ enum input_state_t { /* The input is waiting for data, all buffers are empty */ NoData, /* The input is running, but has seen many underruns or overruns recently */ Unstable, /* The input is running, but the audio level is too low, or has * been too low recently */ Silence, /* The input is running stable */ Streaming }; /* The delay after which the glitch counter is reset */ #define INPUT_COUNTER_RESET_TIME 30 // minutes /* How many glitches we tolerate in Streaming state before * we consider the input Unstable */ #define INPUT_UNSTABLE_THRESHOLD 3 /* For how long the input buffers must be empty before we move an input to the * NoData state. */ #define INPUT_NODATA_TIMEOUT 30 // seconds /* For silence detection, we count the number of occurrences the audio level * falls below a threshold. * * The counter is decreased for each frame that has good audio level. * * The counter saturates, and this value defines for how long the * input will be considered silent after a cut. * * If the count reaches a certain value, the input changes state * to Silence. */ #define INPUT_AUDIO_LEVEL_THRESHOLD -50 // dB #define INPUT_AUDIO_LEVEL_SILENCE_COUNT 100 // superframes (120ms) #define INPUT_AUDIO_LEVEL_COUNT_SATURATION 500 // superframes (120ms) /* An example of how the state changes work. * The timeout is set to expire in 30 minutes * at each under-/overrun. * * The glitch counter is increased by one for each glitch (can be a * saturating counter), and set to zero when the counter timeout expires. * * The state is then simply depending on the glitch counter value. * * Graphical example: state STREAMING | UNSTABLE | STREAMING xruns U U U glitch counter 0 1 2 3 0 reset timeout \ |\ |\ |\ \ | \ | \ | \ \ | \ | \ | \ \| \ | \| \ ` \| ` \ ` \ \ \ \ \ timeout expires ___________________\ <--30min--> */ /* InputStat takes care of * - saving the statistics for graphing * - calculating the state of the input for monitoring */ class InputStat { public: InputStat(std::string name) : m_name(name) { /* Statistics */ num_underruns = 0; num_overruns = 0; /* State handling */ time_t now = time(NULL); m_time_last_event = now; m_time_last_buffer_nonempty = 0; m_buffer_empty = true; m_glitch_counter = 0; m_silence_counter = 0; reset(); } void registerAtServer(void); ~InputStat(); // Gets called each time the statistics are transmitted, // and resets the counters to zero void reset(void) { min_fill_buffer = MIN_FILL_BUFFER_UNDEF; max_fill_buffer = 0; peak_left = 0; peak_right = 0; } std::string& get_name(void) { return m_name; } /* This function is called for every frame read by * the multiplexer */ void notifyBuffer(long bufsize) { boost::mutex::scoped_lock lock(m_mutex); // Statistics if (bufsize > max_fill_buffer) { max_fill_buffer = bufsize; } if (bufsize < min_fill_buffer || min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { min_fill_buffer = bufsize; } // State m_buffer_empty = (bufsize == 0); if (!m_buffer_empty) { m_time_last_buffer_nonempty = time(NULL); } } void notifyPeakLevels(int peak_left, int peak_right) { boost::mutex::scoped_lock lock(m_mutex); // Statistics if (peak_left > this->peak_left) { this->peak_left = peak_left; } if (peak_right > this->peak_right) { this->peak_right = peak_right; } // State // using the smallest of the two channels // allows us to detect if only one channel // is silent. int minpeak = peak_left < peak_right ? peak_left : peak_right; const int16_t int16_max = std::numeric_limits::max(); int peak_dB = minpeak ? round(20*log10((double)minpeak / int16_max)) : -90; if (peak_dB < INPUT_AUDIO_LEVEL_THRESHOLD) { if (m_silence_counter < INPUT_AUDIO_LEVEL_COUNT_SATURATION) { m_silence_counter++; } } else { if (m_silence_counter > 0) { m_silence_counter--; } } } void notifyUnderrun(void) { boost::mutex::scoped_lock lock(m_mutex); // Statistics num_underruns++; // State m_time_last_event = time(NULL); if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { m_glitch_counter++; } } void notifyOverrun(void) { boost::mutex::scoped_lock lock(m_mutex); // Statistics num_overruns++; // State m_time_last_event = time(NULL); if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { m_glitch_counter++; } } std::string encodeValuesJSON(void); std::string encodeStateJSON(void); input_state_t determineState(void); private: std::string m_name; /************ STATISTICS ***********/ // minimum and maximum buffer fill since last reset long min_fill_buffer; long max_fill_buffer; // counter of number of overruns and underruns since startup uint32_t num_underruns; uint32_t num_overruns; // peak audio levels (linear 16-bit PCM) for the two channels int peak_left; int peak_right; /************* STATE ***************/ /* Variables used for determining the input state */ int m_glitch_counter; // saturating counter int m_silence_counter; // saturating counter time_t m_time_last_event; time_t m_time_last_buffer_nonempty; bool m_buffer_empty; // The mutex that has to be held during all notify and readout mutable boost::mutex m_mutex; }; class ManagementServer { public: ManagementServer() : m_running(false), m_fault(false), m_pending(false) { } ~ManagementServer() { m_running = false; m_fault = false; m_pending = false; if (m_sock) { close(m_sock); m_thread.interrupt(); } m_thread.join(); } void open(int listenport) { m_listenport = listenport; if (m_listenport > 0) { m_sock = 0; m_thread = boost::thread(&ManagementServer::serverThread, this); } } /* Un-/Register a statistics data source */ void registerInput(InputStat* is); void unregisterInput(std::string id); /* Ask if there is a configuration request pending */ bool request_pending() { return m_pending; } /* Load a ptree given by the management server. * * Returns true if the ptree was updated */ bool retrieve_new_ptree(boost::property_tree::ptree& pt); /* Update the copy of the configuration property tree and notify the * update to the internal server thread. */ void update_ptree(const boost::property_tree::ptree& pt); bool fault_detected() { return m_fault; } void restart(void); private: void restart_thread(long); /******* TCP Socket Server ******/ // no copying (because of the thread) ManagementServer(const ManagementServer& other); void serverThread(void); bool isInputRegistered(std::string& id); int m_listenport; // serverThread runs in a separate thread bool m_running; bool m_fault; boost::thread m_thread; boost::thread m_restarter_thread; int m_sock; /******* Statistics Data ********/ std::map m_inputStats; /* Return a description of the configuration that will * allow to define what graphs to be created * * returns: a JSON encoded configuration */ std::string getStatConfigJSON(); /* Return the values for the statistics as defined in the configuration * * returns: JSON encoded statistics */ std::string getValuesJSON(); /* Return the state of each input * * returns: JSON encoded state */ std::string getStateJSON(); // mutex for accessing the map mutable boost::mutex m_statsmutex; /******** Configuration Data *******/ bool m_pending; bool m_retrieve_pending; boost::condition_variable m_condition; mutable boost::mutex m_configmutex; boost::property_tree::ptree m_pt; }; // If necessary construct the management server singleton and return // a reference to it ManagementServer& get_mgmt_server(); #endif