aboutsummaryrefslogtreecommitdiffstats
path: root/src/StatsServer.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-05-09 16:07:01 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-05-09 16:07:01 +0200
commit0e5797c291d29587d5ea5beebe6430cb041903bd (patch)
tree1e7d6a833d983f27de0b0f0b426ca070825ed301 /src/StatsServer.cpp
parentebc799479b15f420702bdfc43f1e6f71160c5977 (diff)
downloaddabmux-0e5797c291d29587d5ea5beebe6430cb041903bd.tar.gz
dabmux-0e5797c291d29587d5ea5beebe6430cb041903bd.tar.bz2
dabmux-0e5797c291d29587d5ea5beebe6430cb041903bd.zip
Add input state monitoring, change munin graphs
StatsServer now determines the input states. Also, the underruns and overruns are counters in munin now, they are not reset anymore.
Diffstat (limited to 'src/StatsServer.cpp')
-rw-r--r--src/StatsServer.cpp345
1 files changed, 228 insertions, 117 deletions
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp
index c7cbc9d..eb30ebd 100644
--- a/src/StatsServer.cpp
+++ b/src/StatsServer.cpp
@@ -5,7 +5,8 @@
Copyright (C) 2014 Matthias P. Braendli
http://mpb.li
- A TCP Socket server that serves statistics for monitoring purposes.
+ A TCP Socket server that serves state information and statistics for
+ monitoring purposes.
This server is very easy to integrate with munin http://munin-monitoring.org/
*/
@@ -32,6 +33,8 @@
#include <stdint.h>
#include <limits>
#include <sstream>
+#include <ctime>
+#include <boost/thread.hpp>
#include "StatsServer.h"
#include "Log.h"
@@ -54,73 +57,43 @@ void StatsServer::notifyBuffer(std::string id, long bufsize)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- if (bufsize > is.max_fill_buffer) {
- is.max_fill_buffer = bufsize;
- }
-
- if (bufsize < is.min_fill_buffer ||
- is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) {
- is.min_fill_buffer = bufsize;
- }
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyBuffer(bufsize);
}
void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- if (peak_left > is.peak_left) {
- is.peak_left = peak_left;
- }
-
- if (peak_right > is.peak_right) {
- is.peak_right = peak_right;
- }
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyPeakLevels(peak_left, peak_right);
}
void StatsServer::notifyUnderrun(std::string id)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- is.num_underruns++;
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyUnderrun();
}
void StatsServer::notifyOverrun(std::string id)
{
boost::mutex::scoped_lock lock(m_mutex);
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyOverrun();
+}
+
+bool StatsServer::isInputRegistered(std::string& id)
+{
if (m_inputStats.count(id) == 0) {
etiLog.level(error) <<
"Stats Server id '" <<
id << "' does was not registered";
- return;
+ return false;
}
-
- InputStat& is = m_inputStats[id];
- is.num_overruns++;
+ return true;
}
std::string StatsServer::getConfigJSON()
@@ -165,7 +138,7 @@ std::string StatsServer::getValuesJSON()
}
ss << " \"" << id << "\" : ";
- ss << stats.encodeJSON();
+ ss << stats.encodeValuesJSON();
stats.reset();
}
@@ -173,106 +146,185 @@ std::string StatsServer::getValuesJSON()
return ss.str();
}
-void StatsServer::serverThread()
+
+std::string StatsServer::getStateJSON()
{
- int accepted_sock;
- char buffer[256];
- char welcome_msg[256];
- struct sockaddr_in serv_addr, cli_addr;
- int n;
-
- int welcome_msg_len = snprintf(welcome_msg, 256,
- "{ \"service\": \""
- "%s %s Stats Server\" }\n",
- PACKAGE_NAME, PACKAGE_VERSION);
-
-
- m_sock = socket(AF_INET, SOCK_STREAM, 0);
- if (m_sock < 0) {
- etiLog.level(error) << "Error opening Stats Server socket: " <<
- strerror(errno);
- return;
- }
+ std::ostringstream ss;
+ ss << "{\n";
- memset(&serv_addr, 0, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
- serv_addr.sin_port = htons(m_listenport);
+ std::map<std::string,InputStat>::iterator iter;
+ int i = 0;
+ for(iter = m_inputStats.begin(); iter != m_inputStats.end();
+ ++iter, i++)
+ {
+ const std::string& id = iter->first;
+ InputStat& stats = iter->second;
- if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- etiLog.level(error) << "Error binding Stats Server socket: " <<
- strerror(errno);
- goto end_serverthread;
+ if (i > 0) {
+ ss << " ,\n";
+ }
+
+ ss << " \"" << id << "\" : ";
+ ss << stats.encodeStateJSON();
+ stats.reset();
}
- if (listen(m_sock, 5) < 0) {
- etiLog.level(error) << "Error listening on Stats Server socket: " <<
- strerror(errno);
- goto end_serverthread;
+ ss << "}\n";
+
+ return ss.str();
+}
+
+void StatsServer::restart()
+{
+ m_restarter_thread = boost::thread(&StatsServer::restart_thread,
+ this, 0);
+}
+
+// This runs in a separate thread, because
+// it would take too long to be done in the main loop
+// thread.
+void StatsServer::restart_thread(long)
+{
+ m_running = false;
+
+ if (m_listenport) {
+ m_thread.interrupt();
+ m_thread.join();
}
- while (m_running)
- {
- socklen_t cli_addr_len = sizeof(cli_addr);
+ m_thread = boost::thread(&StatsServer::serverThread, this);
+}
- /* Accept actual connection from the client */
- accepted_sock = accept(m_sock, (struct sockaddr *)&cli_addr, &cli_addr_len);
- if (accepted_sock < 0) {
- etiLog.level(warn) << "Stats Server cound not accept connection: " <<
+void StatsServer::serverThread()
+{
+ m_fault = false;
+
+ try {
+ int accepted_sock;
+ char buffer[256];
+ char welcome_msg[256];
+ struct sockaddr_in serv_addr, cli_addr;
+ int n;
+
+ int welcome_msg_len = snprintf(welcome_msg, 256,
+ "{ \"service\": \""
+ "%s %s Stats Server\" }\n",
+ PACKAGE_NAME,
+#if defined(GITVERSION)
+ GITVERSION
+#else
+ PACKAGE_VERSION
+#endif
+ );
+
+
+ m_sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (m_sock < 0) {
+ etiLog.level(error) << "Error opening Stats Server socket: " <<
strerror(errno);
- continue;
+ m_fault = true;
+ return;
}
- /* Send welcome message with version */
- n = write(accepted_sock, welcome_msg, welcome_msg_len);
- if (n < 0) {
- etiLog.level(warn) << "Error writing to Stats Server socket " <<
+
+ memset(&serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
+ serv_addr.sin_port = htons(m_listenport);
+
+ if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+ etiLog.level(error) << "Error binding Stats Server socket: " <<
strerror(errno);
- close(accepted_sock);
- continue;
+ goto end_serverthread;
}
- /* receive command */
- memset(buffer, 0, 256);
- int n = read(accepted_sock, buffer, 255);
- if (n < 0) {
- etiLog.level(warn) << "Error reading from Stats Server socket " <<
+ if (listen(m_sock, 5) < 0) {
+ etiLog.level(error) << "Error listening on Stats Server socket: " <<
strerror(errno);
- close(accepted_sock);
- continue;
+ goto end_serverthread;
}
- if (strcmp(buffer, "config\n") == 0)
- {
- std::string json = getConfigJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else if (strcmp(buffer, "values\n") == 0)
- {
- std::string json = getValuesJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else
+ m_running = true;
+
+ while (m_running)
{
- int len = snprintf(buffer, 256, "Invalid command\n");
+ socklen_t cli_addr_len = sizeof(cli_addr);
+
+ /* Accept actual connection from the client */
+ accepted_sock = accept(m_sock,
+ (struct sockaddr *)&cli_addr,
+ &cli_addr_len);
- n = write(accepted_sock, buffer, len);
+ if (accepted_sock < 0) {
+ etiLog.level(warn) << "Stats Server cound not accept connection: " <<
+ strerror(errno);
+ continue;
+ }
+ /* Send welcome message with version */
+ n = write(accepted_sock, welcome_msg, welcome_msg_len);
if (n < 0) {
etiLog.level(warn) << "Error writing to Stats Server socket " <<
strerror(errno);
+ close(accepted_sock);
+ continue;
+ }
+
+ /* receive command */
+ memset(buffer, 0, 256);
+ int n = read(accepted_sock, buffer, 255);
+ if (n < 0) {
+ etiLog.level(warn) << "Error reading from Stats Server socket " <<
+ strerror(errno);
+ close(accepted_sock);
+ continue;
+ }
+
+ if (strcmp(buffer, "config\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getConfigJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
+ }
+ else if (strcmp(buffer, "values\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getValuesJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
+ }
+ else if (strcmp(buffer, "state\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getStateJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
}
+ else
+ {
+ int len = snprintf(buffer, 256, "Invalid command\n");
+
+ n = write(accepted_sock, buffer, len);
+ if (n < 0) {
+ etiLog.level(warn) << "Error writing to Stats Server socket " <<
+ strerror(errno);
+ }
+ close(accepted_sock);
+ continue;
+ }
+
close(accepted_sock);
- continue;
}
- close(accepted_sock);
- }
-
end_serverthread:
- close(m_sock);
+ m_fault = true;
+ close(m_sock);
+
+ }
+ catch (std::exception& e) {
+ etiLog.level(error) << "Statistics server caught exception: " << e.what();
+ m_fault = true;
+ }
}
-std::string InputStat::encodeJSON()
+std::string InputStat::encodeValuesJSON()
{
std::ostringstream ss;
@@ -295,3 +347,62 @@ std::string InputStat::encodeJSON()
return ss.str();
}
+std::string InputStat::encodeStateJSON()
+{
+ std::ostringstream ss;
+
+ ss << "{ \"state\" : ";
+
+ switch (determineState()) {
+ case NoData:
+ ss << "\"NoData\"";
+ break;
+ case Unstable:
+ ss << "\"Unstable\"";
+ break;
+ case Streaming:
+ ss << "\"Streaming\"";
+ break;
+ default:
+ ss << "\"Unknown\"";
+ }
+
+ ss << " }";
+
+ return ss.str();
+}
+
+input_state_t InputStat::determineState(void)
+{
+ time_t now = time(NULL);
+ input_state_t state;
+
+ /* if the last event was more that INPUT_COUNTER_RESET_TIME
+ * minutes ago, the timeout has expired. We can reset our
+ * glitch counter.
+ */
+ if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) {
+ m_glitch_counter = 0;
+ }
+
+ // STATE CALCULATION
+
+ /* If the buffer has been empty for more than
+ * INPUT_NODATA_TIMEOUT, we go to the NoData state.
+ */
+ if (m_buffer_empty &&
+ now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) {
+ state = NoData;
+ }
+
+ /* Otherwise, the state depends on the glitch counter */
+ else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) {
+ state = Unstable;
+ }
+ else {
+ state = Streaming;
+ }
+
+ return state;
+}
+