aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xdoc/stats_dabmux_multi.py2
-rw-r--r--src/DabMux.cpp7
-rw-r--r--src/StatsServer.cpp345
-rw-r--r--src/StatsServer.h232
4 files changed, 443 insertions, 143 deletions
diff --git a/doc/stats_dabmux_multi.py b/doc/stats_dabmux_multi.py
index 7697b6e..7c21d84 100755
--- a/doc/stats_dabmux_multi.py
+++ b/doc/stats_dabmux_multi.py
@@ -43,10 +43,12 @@ underruns.info Number of underruns
underruns.label Number of underruns
underruns.min 0
underruns.warning 0:0
+underruns.type COUNTER
overruns.info Number of overruns
overruns.label Number of overruns
overruns.min 0
overruns.warning 0:0
+overruns.type COUNTER
multigraph audio_levels_{ident}
graph_title Contribution {ident} audio level (peak)
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index c090738..e698e89 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -1896,6 +1896,13 @@ int main(int argc, char *argv[])
etiLog.level(warn) << "Detected Remote Control fault, restarting it";
rc->restart();
}
+
+ /* Same for statistics server */
+ if (global_stats && fc->FCT == 249 && global_stats->fault_detected()) {
+ etiLog.level(warn) <<
+ "Detected Statistics Server fault, restarting it";
+ global_stats->restart();
+ }
}
EXIT:
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp
index c7cbc9d..eb30ebd 100644
--- a/src/StatsServer.cpp
+++ b/src/StatsServer.cpp
@@ -5,7 +5,8 @@
Copyright (C) 2014 Matthias P. Braendli
http://mpb.li
- A TCP Socket server that serves statistics for monitoring purposes.
+ A TCP Socket server that serves state information and statistics for
+ monitoring purposes.
This server is very easy to integrate with munin http://munin-monitoring.org/
*/
@@ -32,6 +33,8 @@
#include <stdint.h>
#include <limits>
#include <sstream>
+#include <ctime>
+#include <boost/thread.hpp>
#include "StatsServer.h"
#include "Log.h"
@@ -54,73 +57,43 @@ void StatsServer::notifyBuffer(std::string id, long bufsize)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- if (bufsize > is.max_fill_buffer) {
- is.max_fill_buffer = bufsize;
- }
-
- if (bufsize < is.min_fill_buffer ||
- is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) {
- is.min_fill_buffer = bufsize;
- }
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyBuffer(bufsize);
}
void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- if (peak_left > is.peak_left) {
- is.peak_left = peak_left;
- }
-
- if (peak_right > is.peak_right) {
- is.peak_right = peak_right;
- }
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyPeakLevels(peak_left, peak_right);
}
void StatsServer::notifyUnderrun(std::string id)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- is.num_underruns++;
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyUnderrun();
}
void StatsServer::notifyOverrun(std::string id)
{
boost::mutex::scoped_lock lock(m_mutex);
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyOverrun();
+}
+
+bool StatsServer::isInputRegistered(std::string& id)
+{
if (m_inputStats.count(id) == 0) {
etiLog.level(error) <<
"Stats Server id '" <<
id << "' does was not registered";
- return;
+ return false;
}
-
- InputStat& is = m_inputStats[id];
- is.num_overruns++;
+ return true;
}
std::string StatsServer::getConfigJSON()
@@ -165,7 +138,7 @@ std::string StatsServer::getValuesJSON()
}
ss << " \"" << id << "\" : ";
- ss << stats.encodeJSON();
+ ss << stats.encodeValuesJSON();
stats.reset();
}
@@ -173,106 +146,185 @@ std::string StatsServer::getValuesJSON()
return ss.str();
}
-void StatsServer::serverThread()
+
+std::string StatsServer::getStateJSON()
{
- int accepted_sock;
- char buffer[256];
- char welcome_msg[256];
- struct sockaddr_in serv_addr, cli_addr;
- int n;
-
- int welcome_msg_len = snprintf(welcome_msg, 256,
- "{ \"service\": \""
- "%s %s Stats Server\" }\n",
- PACKAGE_NAME, PACKAGE_VERSION);
-
-
- m_sock = socket(AF_INET, SOCK_STREAM, 0);
- if (m_sock < 0) {
- etiLog.level(error) << "Error opening Stats Server socket: " <<
- strerror(errno);
- return;
- }
+ std::ostringstream ss;
+ ss << "{\n";
- memset(&serv_addr, 0, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
- serv_addr.sin_port = htons(m_listenport);
+ std::map<std::string,InputStat>::iterator iter;
+ int i = 0;
+ for(iter = m_inputStats.begin(); iter != m_inputStats.end();
+ ++iter, i++)
+ {
+ const std::string& id = iter->first;
+ InputStat& stats = iter->second;
- if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- etiLog.level(error) << "Error binding Stats Server socket: " <<
- strerror(errno);
- goto end_serverthread;
+ if (i > 0) {
+ ss << " ,\n";
+ }
+
+ ss << " \"" << id << "\" : ";
+ ss << stats.encodeStateJSON();
+ stats.reset();
}
- if (listen(m_sock, 5) < 0) {
- etiLog.level(error) << "Error listening on Stats Server socket: " <<
- strerror(errno);
- goto end_serverthread;
+ ss << "}\n";
+
+ return ss.str();
+}
+
+void StatsServer::restart()
+{
+ m_restarter_thread = boost::thread(&StatsServer::restart_thread,
+ this, 0);
+}
+
+// This runs in a separate thread, because
+// it would take too long to be done in the main loop
+// thread.
+void StatsServer::restart_thread(long)
+{
+ m_running = false;
+
+ if (m_listenport) {
+ m_thread.interrupt();
+ m_thread.join();
}
- while (m_running)
- {
- socklen_t cli_addr_len = sizeof(cli_addr);
+ m_thread = boost::thread(&StatsServer::serverThread, this);
+}
- /* Accept actual connection from the client */
- accepted_sock = accept(m_sock, (struct sockaddr *)&cli_addr, &cli_addr_len);
- if (accepted_sock < 0) {
- etiLog.level(warn) << "Stats Server cound not accept connection: " <<
+void StatsServer::serverThread()
+{
+ m_fault = false;
+
+ try {
+ int accepted_sock;
+ char buffer[256];
+ char welcome_msg[256];
+ struct sockaddr_in serv_addr, cli_addr;
+ int n;
+
+ int welcome_msg_len = snprintf(welcome_msg, 256,
+ "{ \"service\": \""
+ "%s %s Stats Server\" }\n",
+ PACKAGE_NAME,
+#if defined(GITVERSION)
+ GITVERSION
+#else
+ PACKAGE_VERSION
+#endif
+ );
+
+
+ m_sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (m_sock < 0) {
+ etiLog.level(error) << "Error opening Stats Server socket: " <<
strerror(errno);
- continue;
+ m_fault = true;
+ return;
}
- /* Send welcome message with version */
- n = write(accepted_sock, welcome_msg, welcome_msg_len);
- if (n < 0) {
- etiLog.level(warn) << "Error writing to Stats Server socket " <<
+
+ memset(&serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
+ serv_addr.sin_port = htons(m_listenport);
+
+ if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+ etiLog.level(error) << "Error binding Stats Server socket: " <<
strerror(errno);
- close(accepted_sock);
- continue;
+ goto end_serverthread;
}
- /* receive command */
- memset(buffer, 0, 256);
- int n = read(accepted_sock, buffer, 255);
- if (n < 0) {
- etiLog.level(warn) << "Error reading from Stats Server socket " <<
+ if (listen(m_sock, 5) < 0) {
+ etiLog.level(error) << "Error listening on Stats Server socket: " <<
strerror(errno);
- close(accepted_sock);
- continue;
+ goto end_serverthread;
}
- if (strcmp(buffer, "config\n") == 0)
- {
- std::string json = getConfigJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else if (strcmp(buffer, "values\n") == 0)
- {
- std::string json = getValuesJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else
+ m_running = true;
+
+ while (m_running)
{
- int len = snprintf(buffer, 256, "Invalid command\n");
+ socklen_t cli_addr_len = sizeof(cli_addr);
+
+ /* Accept actual connection from the client */
+ accepted_sock = accept(m_sock,
+ (struct sockaddr *)&cli_addr,
+ &cli_addr_len);
- n = write(accepted_sock, buffer, len);
+ if (accepted_sock < 0) {
+ etiLog.level(warn) << "Stats Server cound not accept connection: " <<
+ strerror(errno);
+ continue;
+ }
+ /* Send welcome message with version */
+ n = write(accepted_sock, welcome_msg, welcome_msg_len);
if (n < 0) {
etiLog.level(warn) << "Error writing to Stats Server socket " <<
strerror(errno);
+ close(accepted_sock);
+ continue;
+ }
+
+ /* receive command */
+ memset(buffer, 0, 256);
+ int n = read(accepted_sock, buffer, 255);
+ if (n < 0) {
+ etiLog.level(warn) << "Error reading from Stats Server socket " <<
+ strerror(errno);
+ close(accepted_sock);
+ continue;
+ }
+
+ if (strcmp(buffer, "config\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getConfigJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
+ }
+ else if (strcmp(buffer, "values\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getValuesJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
+ }
+ else if (strcmp(buffer, "state\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getStateJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
}
+ else
+ {
+ int len = snprintf(buffer, 256, "Invalid command\n");
+
+ n = write(accepted_sock, buffer, len);
+ if (n < 0) {
+ etiLog.level(warn) << "Error writing to Stats Server socket " <<
+ strerror(errno);
+ }
+ close(accepted_sock);
+ continue;
+ }
+
close(accepted_sock);
- continue;
}
- close(accepted_sock);
- }
-
end_serverthread:
- close(m_sock);
+ m_fault = true;
+ close(m_sock);
+
+ }
+ catch (std::exception& e) {
+ etiLog.level(error) << "Statistics server caught exception: " << e.what();
+ m_fault = true;
+ }
}
-std::string InputStat::encodeJSON()
+std::string InputStat::encodeValuesJSON()
{
std::ostringstream ss;
@@ -295,3 +347,62 @@ std::string InputStat::encodeJSON()
return ss.str();
}
+std::string InputStat::encodeStateJSON()
+{
+ std::ostringstream ss;
+
+ ss << "{ \"state\" : ";
+
+ switch (determineState()) {
+ case NoData:
+ ss << "\"NoData\"";
+ break;
+ case Unstable:
+ ss << "\"Unstable\"";
+ break;
+ case Streaming:
+ ss << "\"Streaming\"";
+ break;
+ default:
+ ss << "\"Unknown\"";
+ }
+
+ ss << " }";
+
+ return ss.str();
+}
+
+input_state_t InputStat::determineState(void)
+{
+ time_t now = time(NULL);
+ input_state_t state;
+
+ /* if the last event was more that INPUT_COUNTER_RESET_TIME
+ * minutes ago, the timeout has expired. We can reset our
+ * glitch counter.
+ */
+ if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) {
+ m_glitch_counter = 0;
+ }
+
+ // STATE CALCULATION
+
+ /* If the buffer has been empty for more than
+ * INPUT_NODATA_TIMEOUT, we go to the NoData state.
+ */
+ if (m_buffer_empty &&
+ now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) {
+ state = NoData;
+ }
+
+ /* Otherwise, the state depends on the glitch counter */
+ else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) {
+ state = Unstable;
+ }
+ else {
+ state = Streaming;
+ }
+
+ return state;
+}
+
diff --git a/src/StatsServer.h b/src/StatsServer.h
index 9eb08df..1a2993a 100644
--- a/src/StatsServer.h
+++ b/src/StatsServer.h
@@ -5,7 +5,8 @@
Copyright (C) 2014 Matthias P. Braendli
http://mpb.li
- A TCP Socket server that serves statistics for monitoring purposes.
+ A TCP Socket server that serves state information and statistics for
+ monitoring purposes.
This server is very easy to integrate with munin
http://munin-monitoring.org/
@@ -13,8 +14,11 @@
The TCP Server responds in JSON, and accepts two commands:
- config
- -and-
- values
+ Inspired by the munin equivalent
+
+ - state
+ Returns the state of each input
*/
/*
@@ -50,47 +54,207 @@
#include <string>
#include <map>
#include <boost/thread.hpp>
+#include <ctime>
#define MIN_FILL_BUFFER_UNDEF (-1)
-struct InputStat
+/*** State handing ***/
+/* An input can be in one of the following three states:
+ */
+enum input_state_t
+{
+ /* The input is waiting for data, all buffers are empty */
+ NoData,
+
+ /* The input is running, but has seen many underruns or overruns */
+ Unstable,
+
+ /* The input is running stable */
+ Streaming
+};
+
+
+/* The delay after which the glitch counter is reset
+ */
+#define INPUT_COUNTER_RESET_TIME 30 // minutes
+
+/* How many glitches we tolerate in Streaming state before
+ * we consider the input Unstable
+ */
+#define INPUT_UNSTABLE_THRESHOLD 3
+
+/* For how long the input buffers must be empty before we move an input to the
+ * NoData state.
+ */
+#define INPUT_NODATA_TIMEOUT 30 // seconds
+
+/* An example of how the state changes work.
+ * The timeout is set to expire in 30 minutes
+ * at each under-/overrun.
+ *
+ * The glitch counter is increased by one for each glitch (can be a
+ * saturating counter), and set to zero when the counter timeout expires.
+ *
+ * The state is then simply depending on the glitch counter value.
+ *
+ * Graphical example:
+
+ state STREAMING | UNSTABLE | STREAMING
+ xruns U U U
+ glitch
+ counter 0 1 2 3 0
+ reset
+ timeout \ |\ |\ |\
+ \ | \ | \ | \
+ \ | \ | \ | \
+ \| \ | \| \
+ ` \| ` \
+ ` \
+ \
+ \
+ \
+ \
+ timeout expires ___________________\
+ <--30min-->
+ */
+
+/* InputStat takes care of
+ * - saving the statistics for graphing
+ * - calculating the state of the input for monitoring
+ */
+class InputStat
{
- InputStat() { reset(); }
+ public:
+ InputStat() {
+ /* Statistics */
+ num_underruns = 0;
+ num_overruns = 0;
+
+ /* State handling */
+ time_t now = time(NULL);
+ m_time_last_event = now;
+ m_time_last_buffer_nonempty = 0;
+ m_buffer_empty = true;
+ m_glitch_counter = 0;
+
+ reset();
+ }
+
+
+ // Gets called each time the statistics are transmitted,
+ // and resets the counters to zero
+ void reset(void)
+ {
+ min_fill_buffer = MIN_FILL_BUFFER_UNDEF;
+ max_fill_buffer = 0;
+
+ peak_left = 0;
+ peak_right = 0;
+ }
+
+ /* This function is called for every frame read by
+ * the multiplexer
+ */
+ void notifyBuffer(long bufsize)
+ {
+ // Statistics
+ if (bufsize > max_fill_buffer) {
+ max_fill_buffer = bufsize;
+ }
+
+ if (bufsize < min_fill_buffer ||
+ min_fill_buffer == MIN_FILL_BUFFER_UNDEF) {
+ min_fill_buffer = bufsize;
+ }
+
+ // State
+ m_buffer_empty = (bufsize == 0);
+ if (!m_buffer_empty) {
+ m_time_last_buffer_nonempty = time(NULL);
+ }
+ }
- // minimum and maximum buffer fill since last reset
- long min_fill_buffer;
- long max_fill_buffer;
+ void notifyPeakLevels(int peak_left, int peak_right)
+ {
+ // Statistics
+ if (peak_left > this->peak_left) {
+ this->peak_left = peak_left;
+ }
+
+ if (peak_right > this->peak_right) {
+ this->peak_right = peak_right;
+ }
+
+ // State
+ // TODO add silence detection
+ }
- // number of overruns and underruns since last reset
- long num_underruns;
- long num_overruns;
+ void notifyUnderrun(void)
+ {
+ // Statistics
+ num_underruns++;
- // peak audio levels (linear 16-bit PCM) for the two channels
- int peak_left;
- int peak_right;
+ // State
+ m_time_last_event = time(NULL);
+ if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) {
+ m_glitch_counter++;
+ }
+ }
- void reset()
- {
- min_fill_buffer = MIN_FILL_BUFFER_UNDEF;
- max_fill_buffer = 0;
+ void notifyOverrun(void)
+ {
+ // Statistics
+ num_overruns++;
- num_underruns = 0;
- num_overruns = 0;
+ // State
+ m_time_last_event = time(NULL);
+ if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) {
+ m_glitch_counter++;
+ }
+ }
- peak_left = 0;
- peak_right = 0;
- }
+ std::string encodeValuesJSON(void);
+
+ std::string encodeStateJSON(void);
+
+ input_state_t determineState(void);
+
+ private:
+ /************ STATISTICS ***********/
+ // minimum and maximum buffer fill since last reset
+ long min_fill_buffer;
+ long max_fill_buffer;
+
+ // counter of number of overruns and underruns since startup
+ uint32_t num_underruns;
+ uint32_t num_overruns;
+
+ // peak audio levels (linear 16-bit PCM) for the two channels
+ int peak_left;
+ int peak_right;
+
+ /************* STATE ***************/
+ /* Variables used for determining the input state */
+ int m_glitch_counter; // saturating counter
+ time_t m_time_last_event;
+ time_t m_time_last_buffer_nonempty;
+ bool m_buffer_empty;
- std::string encodeJSON();
};
class StatsServer
{
public:
- StatsServer() : m_listenport(0), m_running(false) {}
+ StatsServer() :
+ m_listenport(0),
+ m_running(false),
+ m_fault(false)
+ { }
+
StatsServer(int listen_port) :
m_listenport(listen_port),
- m_running(true),
+ m_running(false),
+ m_fault(false),
m_thread(&StatsServer::serverThread, this)
{
m_sock = 0;
@@ -99,6 +263,7 @@ class StatsServer
~StatsServer()
{
m_running = false;
+ m_fault = false;
if (m_sock) {
close(m_sock);
}
@@ -107,6 +272,9 @@ class StatsServer
void registerInput(std::string id);
+ bool fault_detected() { return m_fault; }
+ void restart(void);
+
/* The following notify functions are used by the input to
* inform the StatsServer about new values
*/
@@ -116,17 +284,23 @@ class StatsServer
void notifyOverrun(std::string id);
private:
+ void restart_thread(long);
+
/******* TCP Socket Server ******/
// no copying (because of the thread)
StatsServer(const StatsServer& other);
- void serverThread();
+ void serverThread(void);
+
+ bool isInputRegistered(std::string& id);
int m_listenport;
// serverThread runs in a separate thread
bool m_running;
+ bool m_fault;
boost::thread m_thread;
+ boost::thread m_restarter_thread;
int m_sock;
@@ -146,6 +320,12 @@ class StatsServer
*/
std::string getValuesJSON();
+ /* Return the state of each input
+ *
+ * returns: JSON encoded state
+ */
+ std::string getStateJSON();
+
// mutex for accessing the map
mutable boost::mutex m_mutex;
};