From 0e5797c291d29587d5ea5beebe6430cb041903bd Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 9 May 2014 16:07:01 +0200 Subject: Add input state monitoring, change munin graphs StatsServer now determines the input states. Also, the underruns and overruns are counters in munin now, they are not reset anymore. --- src/DabMux.cpp | 7 ++ src/StatsServer.cpp | 345 ++++++++++++++++++++++++++++++++++------------------ src/StatsServer.h | 232 +++++++++++++++++++++++++++++++---- 3 files changed, 441 insertions(+), 143 deletions(-) (limited to 'src') diff --git a/src/DabMux.cpp b/src/DabMux.cpp index c090738..e698e89 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -1896,6 +1896,13 @@ int main(int argc, char *argv[]) etiLog.level(warn) << "Detected Remote Control fault, restarting it"; rc->restart(); } + + /* Same for statistics server */ + if (global_stats && fc->FCT == 249 && global_stats->fault_detected()) { + etiLog.level(warn) << + "Detected Statistics Server fault, restarting it"; + global_stats->restart(); + } } EXIT: diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp index c7cbc9d..eb30ebd 100644 --- a/src/StatsServer.cpp +++ b/src/StatsServer.cpp @@ -5,7 +5,8 @@ Copyright (C) 2014 Matthias P. Braendli http://mpb.li - A TCP Socket server that serves statistics for monitoring purposes. + A TCP Socket server that serves state information and statistics for + monitoring purposes. This server is very easy to integrate with munin http://munin-monitoring.org/ */ @@ -32,6 +33,8 @@ #include #include #include +#include +#include #include "StatsServer.h" #include "Log.h" @@ -54,73 +57,43 @@ void StatsServer::notifyBuffer(std::string id, long bufsize) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - if (bufsize > is.max_fill_buffer) { - is.max_fill_buffer = bufsize; - } - - if (bufsize < is.min_fill_buffer || - is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { - is.min_fill_buffer = bufsize; - } + if (isInputRegistered(id)) + m_inputStats[id].notifyBuffer(bufsize); } void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - if (peak_left > is.peak_left) { - is.peak_left = peak_left; - } - - if (peak_right > is.peak_right) { - is.peak_right = peak_right; - } + if (isInputRegistered(id)) + m_inputStats[id].notifyPeakLevels(peak_left, peak_right); } void StatsServer::notifyUnderrun(std::string id) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - is.num_underruns++; + if (isInputRegistered(id)) + m_inputStats[id].notifyUnderrun(); } void StatsServer::notifyOverrun(std::string id) { boost::mutex::scoped_lock lock(m_mutex); + if (isInputRegistered(id)) + m_inputStats[id].notifyOverrun(); +} + +bool StatsServer::isInputRegistered(std::string& id) +{ if (m_inputStats.count(id) == 0) { etiLog.level(error) << "Stats Server id '" << id << "' does was not registered"; - return; + return false; } - - InputStat& is = m_inputStats[id]; - is.num_overruns++; + return true; } std::string StatsServer::getConfigJSON() @@ -165,7 +138,7 @@ std::string StatsServer::getValuesJSON() } ss << " \"" << id << "\" : "; - ss << stats.encodeJSON(); + ss << stats.encodeValuesJSON(); stats.reset(); } @@ -173,106 +146,185 @@ std::string StatsServer::getValuesJSON() return ss.str(); } -void StatsServer::serverThread() + +std::string StatsServer::getStateJSON() { - int accepted_sock; - char buffer[256]; - char welcome_msg[256]; - struct sockaddr_in serv_addr, cli_addr; - int n; - - int welcome_msg_len = snprintf(welcome_msg, 256, - "{ \"service\": \"" - "%s %s Stats Server\" }\n", - PACKAGE_NAME, PACKAGE_VERSION); - - - m_sock = socket(AF_INET, SOCK_STREAM, 0); - if (m_sock < 0) { - etiLog.level(error) << "Error opening Stats Server socket: " << - strerror(errno); - return; - } + std::ostringstream ss; + ss << "{\n"; - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 - serv_addr.sin_port = htons(m_listenport); + std::map::iterator iter; + int i = 0; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + ++iter, i++) + { + const std::string& id = iter->first; + InputStat& stats = iter->second; - if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - etiLog.level(error) << "Error binding Stats Server socket: " << - strerror(errno); - goto end_serverthread; + if (i > 0) { + ss << " ,\n"; + } + + ss << " \"" << id << "\" : "; + ss << stats.encodeStateJSON(); + stats.reset(); } - if (listen(m_sock, 5) < 0) { - etiLog.level(error) << "Error listening on Stats Server socket: " << - strerror(errno); - goto end_serverthread; + ss << "}\n"; + + return ss.str(); +} + +void StatsServer::restart() +{ + m_restarter_thread = boost::thread(&StatsServer::restart_thread, + this, 0); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void StatsServer::restart_thread(long) +{ + m_running = false; + + if (m_listenport) { + m_thread.interrupt(); + m_thread.join(); } - while (m_running) - { - socklen_t cli_addr_len = sizeof(cli_addr); + m_thread = boost::thread(&StatsServer::serverThread, this); +} - /* Accept actual connection from the client */ - accepted_sock = accept(m_sock, (struct sockaddr *)&cli_addr, &cli_addr_len); - if (accepted_sock < 0) { - etiLog.level(warn) << "Stats Server cound not accept connection: " << +void StatsServer::serverThread() +{ + m_fault = false; + + try { + int accepted_sock; + char buffer[256]; + char welcome_msg[256]; + struct sockaddr_in serv_addr, cli_addr; + int n; + + int welcome_msg_len = snprintf(welcome_msg, 256, + "{ \"service\": \"" + "%s %s Stats Server\" }\n", + PACKAGE_NAME, +#if defined(GITVERSION) + GITVERSION +#else + PACKAGE_VERSION +#endif + ); + + + m_sock = socket(AF_INET, SOCK_STREAM, 0); + if (m_sock < 0) { + etiLog.level(error) << "Error opening Stats Server socket: " << strerror(errno); - continue; + m_fault = true; + return; } - /* Send welcome message with version */ - n = write(accepted_sock, welcome_msg, welcome_msg_len); - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << + + memset(&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 + serv_addr.sin_port = htons(m_listenport); + + if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + etiLog.level(error) << "Error binding Stats Server socket: " << strerror(errno); - close(accepted_sock); - continue; + goto end_serverthread; } - /* receive command */ - memset(buffer, 0, 256); - int n = read(accepted_sock, buffer, 255); - if (n < 0) { - etiLog.level(warn) << "Error reading from Stats Server socket " << + if (listen(m_sock, 5) < 0) { + etiLog.level(error) << "Error listening on Stats Server socket: " << strerror(errno); - close(accepted_sock); - continue; + goto end_serverthread; } - if (strcmp(buffer, "config\n") == 0) - { - std::string json = getConfigJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "values\n") == 0) - { - std::string json = getValuesJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else + m_running = true; + + while (m_running) { - int len = snprintf(buffer, 256, "Invalid command\n"); + socklen_t cli_addr_len = sizeof(cli_addr); + + /* Accept actual connection from the client */ + accepted_sock = accept(m_sock, + (struct sockaddr *)&cli_addr, + &cli_addr_len); - n = write(accepted_sock, buffer, len); + if (accepted_sock < 0) { + etiLog.level(warn) << "Stats Server cound not accept connection: " << + strerror(errno); + continue; + } + /* Send welcome message with version */ + n = write(accepted_sock, welcome_msg, welcome_msg_len); if (n < 0) { etiLog.level(warn) << "Error writing to Stats Server socket " << strerror(errno); + close(accepted_sock); + continue; + } + + /* receive command */ + memset(buffer, 0, 256); + int n = read(accepted_sock, buffer, 255); + if (n < 0) { + etiLog.level(warn) << "Error reading from Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + if (strcmp(buffer, "config\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getConfigJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "values\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getValuesJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "state\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getStateJSON(); + n = write(accepted_sock, json.c_str(), json.size()); } + else + { + int len = snprintf(buffer, 256, "Invalid command\n"); + + n = write(accepted_sock, buffer, len); + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + } + close(accepted_sock); + continue; + } + close(accepted_sock); - continue; } - close(accepted_sock); - } - end_serverthread: - close(m_sock); + m_fault = true; + close(m_sock); + + } + catch (std::exception& e) { + etiLog.level(error) << "Statistics server caught exception: " << e.what(); + m_fault = true; + } } -std::string InputStat::encodeJSON() +std::string InputStat::encodeValuesJSON() { std::ostringstream ss; @@ -295,3 +347,62 @@ std::string InputStat::encodeJSON() return ss.str(); } +std::string InputStat::encodeStateJSON() +{ + std::ostringstream ss; + + ss << "{ \"state\" : "; + + switch (determineState()) { + case NoData: + ss << "\"NoData\""; + break; + case Unstable: + ss << "\"Unstable\""; + break; + case Streaming: + ss << "\"Streaming\""; + break; + default: + ss << "\"Unknown\""; + } + + ss << " }"; + + return ss.str(); +} + +input_state_t InputStat::determineState(void) +{ + time_t now = time(NULL); + input_state_t state; + + /* if the last event was more that INPUT_COUNTER_RESET_TIME + * minutes ago, the timeout has expired. We can reset our + * glitch counter. + */ + if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) { + m_glitch_counter = 0; + } + + // STATE CALCULATION + + /* If the buffer has been empty for more than + * INPUT_NODATA_TIMEOUT, we go to the NoData state. + */ + if (m_buffer_empty && + now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) { + state = NoData; + } + + /* Otherwise, the state depends on the glitch counter */ + else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) { + state = Unstable; + } + else { + state = Streaming; + } + + return state; +} + diff --git a/src/StatsServer.h b/src/StatsServer.h index 9eb08df..1a2993a 100644 --- a/src/StatsServer.h +++ b/src/StatsServer.h @@ -5,7 +5,8 @@ Copyright (C) 2014 Matthias P. Braendli http://mpb.li - A TCP Socket server that serves statistics for monitoring purposes. + A TCP Socket server that serves state information and statistics for + monitoring purposes. This server is very easy to integrate with munin http://munin-monitoring.org/ @@ -13,8 +14,11 @@ The TCP Server responds in JSON, and accepts two commands: - config - -and- - values + Inspired by the munin equivalent + + - state + Returns the state of each input */ /* @@ -50,47 +54,207 @@ #include #include #include +#include #define MIN_FILL_BUFFER_UNDEF (-1) -struct InputStat +/*** State handing ***/ +/* An input can be in one of the following three states: + */ +enum input_state_t +{ + /* The input is waiting for data, all buffers are empty */ + NoData, + + /* The input is running, but has seen many underruns or overruns */ + Unstable, + + /* The input is running stable */ + Streaming +}; + + +/* The delay after which the glitch counter is reset + */ +#define INPUT_COUNTER_RESET_TIME 30 // minutes + +/* How many glitches we tolerate in Streaming state before + * we consider the input Unstable + */ +#define INPUT_UNSTABLE_THRESHOLD 3 + +/* For how long the input buffers must be empty before we move an input to the + * NoData state. + */ +#define INPUT_NODATA_TIMEOUT 30 // seconds + +/* An example of how the state changes work. + * The timeout is set to expire in 30 minutes + * at each under-/overrun. + * + * The glitch counter is increased by one for each glitch (can be a + * saturating counter), and set to zero when the counter timeout expires. + * + * The state is then simply depending on the glitch counter value. + * + * Graphical example: + + state STREAMING | UNSTABLE | STREAMING + xruns U U U + glitch + counter 0 1 2 3 0 + reset + timeout \ |\ |\ |\ + \ | \ | \ | \ + \ | \ | \ | \ + \| \ | \| \ + ` \| ` \ + ` \ + \ + \ + \ + \ + timeout expires ___________________\ + <--30min--> + */ + +/* InputStat takes care of + * - saving the statistics for graphing + * - calculating the state of the input for monitoring + */ +class InputStat { - InputStat() { reset(); } + public: + InputStat() { + /* Statistics */ + num_underruns = 0; + num_overruns = 0; + + /* State handling */ + time_t now = time(NULL); + m_time_last_event = now; + m_time_last_buffer_nonempty = 0; + m_buffer_empty = true; + m_glitch_counter = 0; + + reset(); + } + + + // Gets called each time the statistics are transmitted, + // and resets the counters to zero + void reset(void) + { + min_fill_buffer = MIN_FILL_BUFFER_UNDEF; + max_fill_buffer = 0; + + peak_left = 0; + peak_right = 0; + } + + /* This function is called for every frame read by + * the multiplexer + */ + void notifyBuffer(long bufsize) + { + // Statistics + if (bufsize > max_fill_buffer) { + max_fill_buffer = bufsize; + } + + if (bufsize < min_fill_buffer || + min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { + min_fill_buffer = bufsize; + } + + // State + m_buffer_empty = (bufsize == 0); + if (!m_buffer_empty) { + m_time_last_buffer_nonempty = time(NULL); + } + } - // minimum and maximum buffer fill since last reset - long min_fill_buffer; - long max_fill_buffer; + void notifyPeakLevels(int peak_left, int peak_right) + { + // Statistics + if (peak_left > this->peak_left) { + this->peak_left = peak_left; + } + + if (peak_right > this->peak_right) { + this->peak_right = peak_right; + } + + // State + // TODO add silence detection + } - // number of overruns and underruns since last reset - long num_underruns; - long num_overruns; + void notifyUnderrun(void) + { + // Statistics + num_underruns++; - // peak audio levels (linear 16-bit PCM) for the two channels - int peak_left; - int peak_right; + // State + m_time_last_event = time(NULL); + if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { + m_glitch_counter++; + } + } - void reset() - { - min_fill_buffer = MIN_FILL_BUFFER_UNDEF; - max_fill_buffer = 0; + void notifyOverrun(void) + { + // Statistics + num_overruns++; - num_underruns = 0; - num_overruns = 0; + // State + m_time_last_event = time(NULL); + if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { + m_glitch_counter++; + } + } - peak_left = 0; - peak_right = 0; - } + std::string encodeValuesJSON(void); + + std::string encodeStateJSON(void); + + input_state_t determineState(void); + + private: + /************ STATISTICS ***********/ + // minimum and maximum buffer fill since last reset + long min_fill_buffer; + long max_fill_buffer; + + // counter of number of overruns and underruns since startup + uint32_t num_underruns; + uint32_t num_overruns; + + // peak audio levels (linear 16-bit PCM) for the two channels + int peak_left; + int peak_right; + + /************* STATE ***************/ + /* Variables used for determining the input state */ + int m_glitch_counter; // saturating counter + time_t m_time_last_event; + time_t m_time_last_buffer_nonempty; + bool m_buffer_empty; - std::string encodeJSON(); }; class StatsServer { public: - StatsServer() : m_listenport(0), m_running(false) {} + StatsServer() : + m_listenport(0), + m_running(false), + m_fault(false) + { } + StatsServer(int listen_port) : m_listenport(listen_port), - m_running(true), + m_running(false), + m_fault(false), m_thread(&StatsServer::serverThread, this) { m_sock = 0; @@ -99,6 +263,7 @@ class StatsServer ~StatsServer() { m_running = false; + m_fault = false; if (m_sock) { close(m_sock); } @@ -107,6 +272,9 @@ class StatsServer void registerInput(std::string id); + bool fault_detected() { return m_fault; } + void restart(void); + /* The following notify functions are used by the input to * inform the StatsServer about new values */ @@ -116,17 +284,23 @@ class StatsServer void notifyOverrun(std::string id); private: + void restart_thread(long); + /******* TCP Socket Server ******/ // no copying (because of the thread) StatsServer(const StatsServer& other); - void serverThread(); + void serverThread(void); + + bool isInputRegistered(std::string& id); int m_listenport; // serverThread runs in a separate thread bool m_running; + bool m_fault; boost::thread m_thread; + boost::thread m_restarter_thread; int m_sock; @@ -146,6 +320,12 @@ class StatsServer */ std::string getValuesJSON(); + /* Return the state of each input + * + * returns: JSON encoded state + */ + std::string getStateJSON(); + // mutex for accessing the map mutable boost::mutex m_mutex; }; -- cgit v1.2.3