From 0e5797c291d29587d5ea5beebe6430cb041903bd Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 9 May 2014 16:07:01 +0200 Subject: Add input state monitoring, change munin graphs StatsServer now determines the input states. Also, the underruns and overruns are counters in munin now, they are not reset anymore. --- src/StatsServer.cpp | 345 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 228 insertions(+), 117 deletions(-) (limited to 'src/StatsServer.cpp') diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp index c7cbc9d..eb30ebd 100644 --- a/src/StatsServer.cpp +++ b/src/StatsServer.cpp @@ -5,7 +5,8 @@ Copyright (C) 2014 Matthias P. Braendli http://mpb.li - A TCP Socket server that serves statistics for monitoring purposes. + A TCP Socket server that serves state information and statistics for + monitoring purposes. This server is very easy to integrate with munin http://munin-monitoring.org/ */ @@ -32,6 +33,8 @@ #include #include #include +#include +#include #include "StatsServer.h" #include "Log.h" @@ -54,73 +57,43 @@ void StatsServer::notifyBuffer(std::string id, long bufsize) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - if (bufsize > is.max_fill_buffer) { - is.max_fill_buffer = bufsize; - } - - if (bufsize < is.min_fill_buffer || - is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { - is.min_fill_buffer = bufsize; - } + if (isInputRegistered(id)) + m_inputStats[id].notifyBuffer(bufsize); } void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - if (peak_left > is.peak_left) { - is.peak_left = peak_left; - } - - if (peak_right > is.peak_right) { - is.peak_right = peak_right; - } + if (isInputRegistered(id)) + m_inputStats[id].notifyPeakLevels(peak_left, peak_right); } void StatsServer::notifyUnderrun(std::string id) { boost::mutex::scoped_lock lock(m_mutex); - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return; - } - - InputStat& is = m_inputStats[id]; - is.num_underruns++; + if (isInputRegistered(id)) + m_inputStats[id].notifyUnderrun(); } void StatsServer::notifyOverrun(std::string id) { boost::mutex::scoped_lock lock(m_mutex); + if (isInputRegistered(id)) + m_inputStats[id].notifyOverrun(); +} + +bool StatsServer::isInputRegistered(std::string& id) +{ if (m_inputStats.count(id) == 0) { etiLog.level(error) << "Stats Server id '" << id << "' does was not registered"; - return; + return false; } - - InputStat& is = m_inputStats[id]; - is.num_overruns++; + return true; } std::string StatsServer::getConfigJSON() @@ -165,7 +138,7 @@ std::string StatsServer::getValuesJSON() } ss << " \"" << id << "\" : "; - ss << stats.encodeJSON(); + ss << stats.encodeValuesJSON(); stats.reset(); } @@ -173,106 +146,185 @@ std::string StatsServer::getValuesJSON() return ss.str(); } -void StatsServer::serverThread() + +std::string StatsServer::getStateJSON() { - int accepted_sock; - char buffer[256]; - char welcome_msg[256]; - struct sockaddr_in serv_addr, cli_addr; - int n; - - int welcome_msg_len = snprintf(welcome_msg, 256, - "{ \"service\": \"" - "%s %s Stats Server\" }\n", - PACKAGE_NAME, PACKAGE_VERSION); - - - m_sock = socket(AF_INET, SOCK_STREAM, 0); - if (m_sock < 0) { - etiLog.level(error) << "Error opening Stats Server socket: " << - strerror(errno); - return; - } + std::ostringstream ss; + ss << "{\n"; - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 - serv_addr.sin_port = htons(m_listenport); + std::map::iterator iter; + int i = 0; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + ++iter, i++) + { + const std::string& id = iter->first; + InputStat& stats = iter->second; - if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - etiLog.level(error) << "Error binding Stats Server socket: " << - strerror(errno); - goto end_serverthread; + if (i > 0) { + ss << " ,\n"; + } + + ss << " \"" << id << "\" : "; + ss << stats.encodeStateJSON(); + stats.reset(); } - if (listen(m_sock, 5) < 0) { - etiLog.level(error) << "Error listening on Stats Server socket: " << - strerror(errno); - goto end_serverthread; + ss << "}\n"; + + return ss.str(); +} + +void StatsServer::restart() +{ + m_restarter_thread = boost::thread(&StatsServer::restart_thread, + this, 0); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void StatsServer::restart_thread(long) +{ + m_running = false; + + if (m_listenport) { + m_thread.interrupt(); + m_thread.join(); } - while (m_running) - { - socklen_t cli_addr_len = sizeof(cli_addr); + m_thread = boost::thread(&StatsServer::serverThread, this); +} - /* Accept actual connection from the client */ - accepted_sock = accept(m_sock, (struct sockaddr *)&cli_addr, &cli_addr_len); - if (accepted_sock < 0) { - etiLog.level(warn) << "Stats Server cound not accept connection: " << +void StatsServer::serverThread() +{ + m_fault = false; + + try { + int accepted_sock; + char buffer[256]; + char welcome_msg[256]; + struct sockaddr_in serv_addr, cli_addr; + int n; + + int welcome_msg_len = snprintf(welcome_msg, 256, + "{ \"service\": \"" + "%s %s Stats Server\" }\n", + PACKAGE_NAME, +#if defined(GITVERSION) + GITVERSION +#else + PACKAGE_VERSION +#endif + ); + + + m_sock = socket(AF_INET, SOCK_STREAM, 0); + if (m_sock < 0) { + etiLog.level(error) << "Error opening Stats Server socket: " << strerror(errno); - continue; + m_fault = true; + return; } - /* Send welcome message with version */ - n = write(accepted_sock, welcome_msg, welcome_msg_len); - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << + + memset(&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 + serv_addr.sin_port = htons(m_listenport); + + if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + etiLog.level(error) << "Error binding Stats Server socket: " << strerror(errno); - close(accepted_sock); - continue; + goto end_serverthread; } - /* receive command */ - memset(buffer, 0, 256); - int n = read(accepted_sock, buffer, 255); - if (n < 0) { - etiLog.level(warn) << "Error reading from Stats Server socket " << + if (listen(m_sock, 5) < 0) { + etiLog.level(error) << "Error listening on Stats Server socket: " << strerror(errno); - close(accepted_sock); - continue; + goto end_serverthread; } - if (strcmp(buffer, "config\n") == 0) - { - std::string json = getConfigJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "values\n") == 0) - { - std::string json = getValuesJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else + m_running = true; + + while (m_running) { - int len = snprintf(buffer, 256, "Invalid command\n"); + socklen_t cli_addr_len = sizeof(cli_addr); + + /* Accept actual connection from the client */ + accepted_sock = accept(m_sock, + (struct sockaddr *)&cli_addr, + &cli_addr_len); - n = write(accepted_sock, buffer, len); + if (accepted_sock < 0) { + etiLog.level(warn) << "Stats Server cound not accept connection: " << + strerror(errno); + continue; + } + /* Send welcome message with version */ + n = write(accepted_sock, welcome_msg, welcome_msg_len); if (n < 0) { etiLog.level(warn) << "Error writing to Stats Server socket " << strerror(errno); + close(accepted_sock); + continue; + } + + /* receive command */ + memset(buffer, 0, 256); + int n = read(accepted_sock, buffer, 255); + if (n < 0) { + etiLog.level(warn) << "Error reading from Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + if (strcmp(buffer, "config\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getConfigJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "values\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getValuesJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "state\n") == 0) + { + boost::mutex::scoped_lock lock(m_mutex); + std::string json = getStateJSON(); + n = write(accepted_sock, json.c_str(), json.size()); } + else + { + int len = snprintf(buffer, 256, "Invalid command\n"); + + n = write(accepted_sock, buffer, len); + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + } + close(accepted_sock); + continue; + } + close(accepted_sock); - continue; } - close(accepted_sock); - } - end_serverthread: - close(m_sock); + m_fault = true; + close(m_sock); + + } + catch (std::exception& e) { + etiLog.level(error) << "Statistics server caught exception: " << e.what(); + m_fault = true; + } } -std::string InputStat::encodeJSON() +std::string InputStat::encodeValuesJSON() { std::ostringstream ss; @@ -295,3 +347,62 @@ std::string InputStat::encodeJSON() return ss.str(); } +std::string InputStat::encodeStateJSON() +{ + std::ostringstream ss; + + ss << "{ \"state\" : "; + + switch (determineState()) { + case NoData: + ss << "\"NoData\""; + break; + case Unstable: + ss << "\"Unstable\""; + break; + case Streaming: + ss << "\"Streaming\""; + break; + default: + ss << "\"Unknown\""; + } + + ss << " }"; + + return ss.str(); +} + +input_state_t InputStat::determineState(void) +{ + time_t now = time(NULL); + input_state_t state; + + /* if the last event was more that INPUT_COUNTER_RESET_TIME + * minutes ago, the timeout has expired. We can reset our + * glitch counter. + */ + if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) { + m_glitch_counter = 0; + } + + // STATE CALCULATION + + /* If the buffer has been empty for more than + * INPUT_NODATA_TIMEOUT, we go to the NoData state. + */ + if (m_buffer_empty && + now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) { + state = NoData; + } + + /* Otherwise, the state depends on the glitch counter */ + else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) { + state = Unstable; + } + else { + state = Streaming; + } + + return state; +} + -- cgit v1.2.3