diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-05-09 16:07:01 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-05-09 16:07:01 +0200 |
commit | 0e5797c291d29587d5ea5beebe6430cb041903bd (patch) | |
tree | 1e7d6a833d983f27de0b0f0b426ca070825ed301 /src | |
parent | ebc799479b15f420702bdfc43f1e6f71160c5977 (diff) | |
download | dabmux-0e5797c291d29587d5ea5beebe6430cb041903bd.tar.gz dabmux-0e5797c291d29587d5ea5beebe6430cb041903bd.tar.bz2 dabmux-0e5797c291d29587d5ea5beebe6430cb041903bd.zip |
Add input state monitoring, change munin graphs
StatsServer now determines the input states.
Also, the underruns and overruns are counters in munin now, they
are not reset anymore.
Diffstat (limited to 'src')
-rw-r--r-- | src/DabMux.cpp | 7 | ||||
-rw-r--r-- | src/StatsServer.cpp | 345 | ||||
-rw-r--r-- | src/StatsServer.h | 232 |
3 files changed, 441 insertions, 143 deletions
diff --git a/src/DabMux.cpp b/src/DabMux.cpp index c090738..e698e89 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -1896,6 +1896,13 @@ int main(int argc, char *argv[]) etiLog.level(warn) << "Detected Remote Control fault, restarting it"; rc->restart(); } + + /* Same for statistics server */ + if (global_stats && fc->FCT == 249 && global_stats->fault_detected()) { + etiLog.level(warn) << + "Detected Statistics Server fault, restarting it"; + global_stats->restart(); + } } EXIT: diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp index c7cbc9d..eb30ebd 100644 --- a/src/StatsServer.cpp +++ b/src/StatsServer.cpp @@ -5,7 +5,8 @@ Copyright (C) 2014 Matthias P. Braendli http://mpb.li - A TCP Socket server that serves statistics for monitoring purposes. + A TCP Socket server that serves state information and statistics for + monitoring purposes. This server is very easy to integrate with munin http://munin-monitoring.org/ */ @@ -32,6 +33,8 @@ #include <stdint.h> #include <limits> #include <sstream> +#include <ctime> +#include <boost/thread.hpp> #include "StatsServer.h" #include "Log.h" @@ -54,73 +57,43 @@ void StatsServer::notifyBuffer(std::string id, long bufsize) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - if (bufsize > is.max_fill_buffer) { - is.max_fill_buffer = bufsize; - } - - if (bufsize < is.min_fill_buffer || - is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { - is.min_fill_buffer = bufsize; - } + if (isInputRegistered(id)) + m_inputStats[id].notifyBuffer(bufsize); } void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - if (peak_left > is.peak_left) { - is.peak_left = peak_left; - } - - if (peak_right > is.peak_right) { - is.peak_right = peak_right; - } + if (isInputRegistered(id)) + m_inputStats[id].notifyPeakLevels(peak_left, peak_right); } void StatsServer::notifyUnderrun(std::string id) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - is.num_underruns++; + if (isInputRegistered(id)) + m_inputStats[id].notifyUnderrun(); } void StatsServer::notifyOverrun(std::string id) { boost::mutex::scoped_lock lock(m_mutex); + if (isInputRegistered(id)) + m_inputStats[id].notifyOverrun(); +} + +bool StatsServer::isInputRegistered(std::string& id) +{ if (m_inputStats.count(id) == 0) { etiLog.level(error) << "Stats Server id '" << id << "' does was not registered"; - return; + return false; } - - InputStat& is = m_inputStats[id]; - is.num_overruns++; + return true; } std::string StatsServer::getConfigJSON() @@ -165,7 +138,7 @@ std::string StatsServer::getValuesJSON() } ss << " \"" << id << "\" : "; - ss << stats.encodeJSON(); + ss << stats.encodeValuesJSON(); stats.reset(); } @@ -173,106 +146,185 @@ std::string StatsServer::getValuesJSON() return ss.str(); } -void StatsServer::serverThread() + +std::string StatsServer::getStateJSON() { - int accepted_sock; - char buffer[256]; - char welcome_msg[256]; - struct sockaddr_in serv_addr, cli_addr; - int n; - - int welcome_msg_len = snprintf(welcome_msg, 256, - "{ \"service\": \"" - "%s %s Stats Server\" }\n", - PACKAGE_NAME, PACKAGE_VERSION); - - - m_sock = socket(AF_INET, SOCK_STREAM, 0); - if (m_sock < 0) { - etiLog.level(error) << "Error opening Stats Server socket: " << - strerror(errno); - return; - } + std::ostringstream ss; + ss << "{\n"; - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 - serv_addr.sin_port = htons(m_listenport); + std::map<std::string,InputStat>::iterator iter; + int i = 0; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + ++iter, i++) + { + const std::string& id = iter->first; + InputStat& stats = iter->second; - if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - etiLog.level(error) << "Error binding Stats Server socket: " << - strerror(errno); - goto end_serverthread; + if (i > 0) { + ss << " ,\n"; + } + + ss << " \"" << id << "\" : "; + ss << stats.encodeStateJSON(); + stats.reset(); } - if (listen(m_sock, 5) < 0) { - etiLog.level(error) << "Error listening on Stats Server socket: " << - strerror(errno); - goto end_serverthread; + ss << "}\n"; + + return ss.str(); +} + +void StatsServer::restart() +{ + m_restarter_thread = boost::thread(&StatsServer::restart_thread, + this, 0); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void StatsServer::restart_thread(long) +{ + m_running = false; + + if (m_listenport) { + m_thread.interrupt(); + m_thread.join(); } - while (m_running) - { - socklen_t cli_addr_len = sizeof(cli_addr); + m_thread = boost::thread(&StatsServer::serverThread, this); +} - /* Accept actual connection from the client */ - accepted_sock = accept(m_sock, (struct sockaddr *)&cli_addr, &cli_addr_len); - if (accepted_sock < 0) { - etiLog.level(warn) << "Stats Server cound not accept connection: " << +void StatsServer::serverThread() +{ + m_fault = false; + + try { + int accepted_sock; + char buffer[256]; + char welcome_msg[256]; + struct sockaddr_in serv_addr, cli_addr; + int n; + + int welcome_msg_len = snprintf(welcome_msg, 256, + "{ \"service\": \"" + "%s %s Stats Server\" }\n", + PACKAGE_NAME, +#if defined(GITVERSION) + GITVERSION +#else + PACKAGE_VERSION +#endif + ); + + + m_sock = socket(AF_INET, SOCK_STREAM, 0); + if (m_sock < 0) { + etiLog.level(error) << "Error opening Stats Server socket: " << strerror(errno); - continue; + m_fault = true; + return; } - /* Send welcome message with version */ - n = write(accepted_sock, welcome_msg, welcome_msg_len); - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << + + memset(&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 + serv_addr.sin_port = htons(m_listenport); + + if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + etiLog.level(error) << "Error binding Stats Server socket: " << strerror(errno); - close(accepted_sock); - continue; + goto end_serverthread; } - /* receive command */ - memset(buffer, 0, 256); - int n = read(accepted_sock, buffer, 255); - if (n < 0) { - etiLog.level(warn) << "Error reading from Stats Server socket " << + if (listen(m_sock, 5) < 0) { + etiLog.level(error) << "Error listening on Stats Server socket: " << strerror(errno); - close(accepted_sock); - continue; + goto end_serverthread; } - if (strcmp(buffer, "config\n") == 0) - { - std::string json = getConfigJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "values\n") == 0) - { - std::string json = getValuesJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else + m_running = true; + + while (m_running) { - int len = snprintf(buffer, 256, "Invalid command\n"); + socklen_t cli_addr_len = sizeof(cli_addr); + + /* Accept actual connection from the client */ + accepted_sock = accept(m_sock, + (struct sockaddr *)&cli_addr, + &cli_addr_len); - n = write(accepted_sock, buffer, len); + if (accepted_sock < 0) { + etiLog.level(warn) << "Stats Server cound not accept connection: " << + strerror(errno); + continue; + } + /* Send welcome message with version */ + n = write(accepted_sock, welcome_msg, welcome_msg_len); if (n < 0) { etiLog.level(warn) << "Error writing to Stats Server socket " << strerror(errno); + close(accepted_sock); + continue; + } + + /* receive command */ + memset(buffer, 0, 256); + int n = read(accepted_sock, buffer, 255); + if (n < 0) { + etiLog.level(warn) << "Error reading from Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + if (strcmp(buffer, "config\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getConfigJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "values\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getValuesJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "state\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getStateJSON(); + n = write(accepted_sock, json.c_str(), json.size()); } + else + { + int len = snprintf(buffer, 256, "Invalid command\n"); + + n = write(accepted_sock, buffer, len); + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + } + close(accepted_sock); + continue; + } + close(accepted_sock); - continue; } - close(accepted_sock); - } - end_serverthread: - close(m_sock); + m_fault = true; + close(m_sock); + + } + catch (std::exception& e) { + etiLog.level(error) << "Statistics server caught exception: " << e.what(); + m_fault = true; + } } -std::string InputStat::encodeJSON() +std::string InputStat::encodeValuesJSON() { std::ostringstream ss; @@ -295,3 +347,62 @@ std::string InputStat::encodeJSON() return ss.str(); } +std::string InputStat::encodeStateJSON() +{ + std::ostringstream ss; + + ss << "{ \"state\" : "; + + switch (determineState()) { + case NoData: + ss << "\"NoData\""; + break; + case Unstable: + ss << "\"Unstable\""; + break; + case Streaming: + ss << "\"Streaming\""; + break; + default: + ss << "\"Unknown\""; + } + + ss << " }"; + + return ss.str(); +} + +input_state_t InputStat::determineState(void) +{ + time_t now = time(NULL); + input_state_t state; + + /* if the last event was more that INPUT_COUNTER_RESET_TIME + * minutes ago, the timeout has expired. We can reset our + * glitch counter. + */ + if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) { + m_glitch_counter = 0; + } + + // STATE CALCULATION + + /* If the buffer has been empty for more than + * INPUT_NODATA_TIMEOUT, we go to the NoData state. + */ + if (m_buffer_empty && + now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) { + state = NoData; + } + + /* Otherwise, the state depends on the glitch counter */ + else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) { + state = Unstable; + } + else { + state = Streaming; + } + + return state; +} + diff --git a/src/StatsServer.h b/src/StatsServer.h index 9eb08df..1a2993a 100644 --- a/src/StatsServer.h +++ b/src/StatsServer.h @@ -5,7 +5,8 @@ Copyright (C) 2014 Matthias P. Braendli http://mpb.li - A TCP Socket server that serves statistics for monitoring purposes. + A TCP Socket server that serves state information and statistics for + monitoring purposes. This server is very easy to integrate with munin http://munin-monitoring.org/ @@ -13,8 +14,11 @@ The TCP Server responds in JSON, and accepts two commands: - config - -and- - values + Inspired by the munin equivalent + + - state + Returns the state of each input */ /* @@ -50,47 +54,207 @@ #include <string> #include <map> #include <boost/thread.hpp> +#include <ctime> #define MIN_FILL_BUFFER_UNDEF (-1) -struct InputStat +/*** State handing ***/ +/* An input can be in one of the following three states: + */ +enum input_state_t +{ + /* The input is waiting for data, all buffers are empty */ + NoData, + + /* The input is running, but has seen many underruns or overruns */ + Unstable, + + /* The input is running stable */ + Streaming +}; + + +/* The delay after which the glitch counter is reset + */ +#define INPUT_COUNTER_RESET_TIME 30 // minutes + +/* How many glitches we tolerate in Streaming state before + * we consider the input Unstable + */ +#define INPUT_UNSTABLE_THRESHOLD 3 + +/* For how long the input buffers must be empty before we move an input to the + * NoData state. + */ +#define INPUT_NODATA_TIMEOUT 30 // seconds + +/* An example of how the state changes work. + * The timeout is set to expire in 30 minutes + * at each under-/overrun. + * + * The glitch counter is increased by one for each glitch (can be a + * saturating counter), and set to zero when the counter timeout expires. + * + * The state is then simply depending on the glitch counter value. + * + * Graphical example: + + state STREAMING | UNSTABLE | STREAMING + xruns U U U + glitch + counter 0 1 2 3 0 + reset + timeout \ |\ |\ |\ + \ | \ | \ | \ + \ | \ | \ | \ + \| \ | \| \ + ` \| ` \ + ` \ + \ + \ + \ + \ + timeout expires ___________________\ + <--30min--> + */ + +/* InputStat takes care of + * - saving the statistics for graphing + * - calculating the state of the input for monitoring + */ +class InputStat { - InputStat() { reset(); } + public: + InputStat() { + /* Statistics */ + num_underruns = 0; + num_overruns = 0; + + /* State handling */ + time_t now = time(NULL); + m_time_last_event = now; + m_time_last_buffer_nonempty = 0; + m_buffer_empty = true; + m_glitch_counter = 0; + + reset(); + } + + + // Gets called each time the statistics are transmitted, + // and resets the counters to zero + void reset(void) + { + min_fill_buffer = MIN_FILL_BUFFER_UNDEF; + max_fill_buffer = 0; + + peak_left = 0; + peak_right = 0; + } + + /* This function is called for every frame read by + * the multiplexer + */ + void notifyBuffer(long bufsize) + { + // Statistics + if (bufsize > max_fill_buffer) { + max_fill_buffer = bufsize; + } + + if (bufsize < min_fill_buffer || + min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { + min_fill_buffer = bufsize; + } + + // State + m_buffer_empty = (bufsize == 0); + if (!m_buffer_empty) { + m_time_last_buffer_nonempty = time(NULL); + } + } - // minimum and maximum buffer fill since last reset - long min_fill_buffer; - long max_fill_buffer; + void notifyPeakLevels(int peak_left, int peak_right) + { + // Statistics + if (peak_left > this->peak_left) { + this->peak_left = peak_left; + } + + if (peak_right > this->peak_right) { + this->peak_right = peak_right; + } + + // State + // TODO add silence detection + } - // number of overruns and underruns since last reset - long num_underruns; - long num_overruns; + void notifyUnderrun(void) + { + // Statistics + num_underruns++; - // peak audio levels (linear 16-bit PCM) for the two channels - int peak_left; - int peak_right; + // State + m_time_last_event = time(NULL); + if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { + m_glitch_counter++; + } + } - void reset() - { - min_fill_buffer = MIN_FILL_BUFFER_UNDEF; - max_fill_buffer = 0; + void notifyOverrun(void) + { + // Statistics + num_overruns++; - num_underruns = 0; - num_overruns = 0; + // State + m_time_last_event = time(NULL); + if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { + m_glitch_counter++; + } + } - peak_left = 0; - peak_right = 0; - } + std::string encodeValuesJSON(void); + + std::string encodeStateJSON(void); + + input_state_t determineState(void); + + private: + /************ STATISTICS ***********/ + // minimum and maximum buffer fill since last reset + long min_fill_buffer; + long max_fill_buffer; + + // counter of number of overruns and underruns since startup + uint32_t num_underruns; + uint32_t num_overruns; + + // peak audio levels (linear 16-bit PCM) for the two channels + int peak_left; + int peak_right; + + /************* STATE ***************/ + /* Variables used for determining the input state */ + int m_glitch_counter; // saturating counter + time_t m_time_last_event; + time_t m_time_last_buffer_nonempty; + bool m_buffer_empty; - std::string encodeJSON(); }; class StatsServer { public: - StatsServer() : m_listenport(0), m_running(false) {} + StatsServer() : + m_listenport(0), + m_running(false), + m_fault(false) + { } + StatsServer(int listen_port) : m_listenport(listen_port), - m_running(true), + m_running(false), + m_fault(false), m_thread(&StatsServer::serverThread, this) { m_sock = 0; @@ -99,6 +263,7 @@ class StatsServer ~StatsServer() { m_running = false; + m_fault = false; if (m_sock) { close(m_sock); } @@ -107,6 +272,9 @@ class StatsServer void registerInput(std::string id); + bool fault_detected() { return m_fault; } + void restart(void); + /* The following notify functions are used by the input to * inform the StatsServer about new values */ @@ -116,17 +284,23 @@ class StatsServer void notifyOverrun(std::string id); private: + void restart_thread(long); + /******* TCP Socket Server ******/ // no copying (because of the thread) StatsServer(const StatsServer& other); - void serverThread(); + void serverThread(void); + + bool isInputRegistered(std::string& id); int m_listenport; // serverThread runs in a separate thread bool m_running; + bool m_fault; boost::thread m_thread; + boost::thread m_restarter_thread; int m_sock; @@ -146,6 +320,12 @@ class StatsServer */ std::string getValuesJSON(); + /* Return the state of each input + * + * returns: JSON encoded state + */ + std::string getStateJSON(); + // mutex for accessing the map mutable boost::mutex m_mutex; }; |