summaryrefslogtreecommitdiffstats
path: root/src/StatsServer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/StatsServer.cpp')
-rw-r--r--src/StatsServer.cpp345
1 files changed, 228 insertions, 117 deletions
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp
index c7cbc9d..eb30ebd 100644
--- a/src/StatsServer.cpp
+++ b/src/StatsServer.cpp
@@ -5,7 +5,8 @@
Copyright (C) 2014 Matthias P. Braendli
http://mpb.li
- A TCP Socket server that serves statistics for monitoring purposes.
+ A TCP Socket server that serves state information and statistics for
+ monitoring purposes.
This server is very easy to integrate with munin http://munin-monitoring.org/
*/
@@ -32,6 +33,8 @@
#include <stdint.h>
#include <limits>
#include <sstream>
+#include <ctime>
+#include <boost/thread.hpp>
#include "StatsServer.h"
#include "Log.h"
@@ -54,73 +57,43 @@ void StatsServer::notifyBuffer(std::string id, long bufsize)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- if (bufsize > is.max_fill_buffer) {
- is.max_fill_buffer = bufsize;
- }
-
- if (bufsize < is.min_fill_buffer ||
- is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) {
- is.min_fill_buffer = bufsize;
- }
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyBuffer(bufsize);
}
void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- if (peak_left > is.peak_left) {
- is.peak_left = peak_left;
- }
-
- if (peak_right > is.peak_right) {
- is.peak_right = peak_right;
- }
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyPeakLevels(peak_left, peak_right);
}
void StatsServer::notifyUnderrun(std::string id)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (m_inputStats.count(id) == 0) {
- etiLog.level(error) <<
- "Stats Server id '" <<
- id << "' does was not registered";
- return;
- }
-
- InputStat& is = m_inputStats[id];
- is.num_underruns++;
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyUnderrun();
}
void StatsServer::notifyOverrun(std::string id)
{
boost::mutex::scoped_lock lock(m_mutex);
+ if (isInputRegistered(id))
+ m_inputStats[id].notifyOverrun();
+}
+
+bool StatsServer::isInputRegistered(std::string& id)
+{
if (m_inputStats.count(id) == 0) {
etiLog.level(error) <<
"Stats Server id '" <<
id << "' does was not registered";
- return;
+ return false;
}
-
- InputStat& is = m_inputStats[id];
- is.num_overruns++;
+ return true;
}
std::string StatsServer::getConfigJSON()
@@ -165,7 +138,7 @@ std::string StatsServer::getValuesJSON()
}
ss << " \"" << id << "\" : ";
- ss << stats.encodeJSON();
+ ss << stats.encodeValuesJSON();
stats.reset();
}
@@ -173,106 +146,185 @@ std::string StatsServer::getValuesJSON()
return ss.str();
}
-void StatsServer::serverThread()
+
+std::string StatsServer::getStateJSON()
{
- int accepted_sock;
- char buffer[256];
- char welcome_msg[256];
- struct sockaddr_in serv_addr, cli_addr;
- int n;
-
- int welcome_msg_len = snprintf(welcome_msg, 256,
- "{ \"service\": \""
- "%s %s Stats Server\" }\n",
- PACKAGE_NAME, PACKAGE_VERSION);
-
-
- m_sock = socket(AF_INET, SOCK_STREAM, 0);
- if (m_sock < 0) {
- etiLog.level(error) << "Error opening Stats Server socket: " <<
- strerror(errno);
- return;
- }
+ std::ostringstream ss;
+ ss << "{\n";
- memset(&serv_addr, 0, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
- serv_addr.sin_port = htons(m_listenport);
+ std::map<std::string,InputStat>::iterator iter;
+ int i = 0;
+ for(iter = m_inputStats.begin(); iter != m_inputStats.end();
+ ++iter, i++)
+ {
+ const std::string& id = iter->first;
+ InputStat& stats = iter->second;
- if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- etiLog.level(error) << "Error binding Stats Server socket: " <<
- strerror(errno);
- goto end_serverthread;
+ if (i > 0) {
+ ss << " ,\n";
+ }
+
+ ss << " \"" << id << "\" : ";
+ ss << stats.encodeStateJSON();
+ stats.reset();
}
- if (listen(m_sock, 5) < 0) {
- etiLog.level(error) << "Error listening on Stats Server socket: " <<
- strerror(errno);
- goto end_serverthread;
+ ss << "}\n";
+
+ return ss.str();
+}
+
+void StatsServer::restart()
+{
+ m_restarter_thread = boost::thread(&StatsServer::restart_thread,
+ this, 0);
+}
+
+// This runs in a separate thread, because
+// it would take too long to be done in the main loop
+// thread.
+void StatsServer::restart_thread(long)
+{
+ m_running = false;
+
+ if (m_listenport) {
+ m_thread.interrupt();
+ m_thread.join();
}
- while (m_running)
- {
- socklen_t cli_addr_len = sizeof(cli_addr);
+ m_thread = boost::thread(&StatsServer::serverThread, this);
+}
- /* Accept actual connection from the client */
- accepted_sock = accept(m_sock, (struct sockaddr *)&cli_addr, &cli_addr_len);
- if (accepted_sock < 0) {
- etiLog.level(warn) << "Stats Server cound not accept connection: " <<
+void StatsServer::serverThread()
+{
+ m_fault = false;
+
+ try {
+ int accepted_sock;
+ char buffer[256];
+ char welcome_msg[256];
+ struct sockaddr_in serv_addr, cli_addr;
+ int n;
+
+ int welcome_msg_len = snprintf(welcome_msg, 256,
+ "{ \"service\": \""
+ "%s %s Stats Server\" }\n",
+ PACKAGE_NAME,
+#if defined(GITVERSION)
+ GITVERSION
+#else
+ PACKAGE_VERSION
+#endif
+ );
+
+
+ m_sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (m_sock < 0) {
+ etiLog.level(error) << "Error opening Stats Server socket: " <<
strerror(errno);
- continue;
+ m_fault = true;
+ return;
}
- /* Send welcome message with version */
- n = write(accepted_sock, welcome_msg, welcome_msg_len);
- if (n < 0) {
- etiLog.level(warn) << "Error writing to Stats Server socket " <<
+
+ memset(&serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
+ serv_addr.sin_port = htons(m_listenport);
+
+ if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+ etiLog.level(error) << "Error binding Stats Server socket: " <<
strerror(errno);
- close(accepted_sock);
- continue;
+ goto end_serverthread;
}
- /* receive command */
- memset(buffer, 0, 256);
- int n = read(accepted_sock, buffer, 255);
- if (n < 0) {
- etiLog.level(warn) << "Error reading from Stats Server socket " <<
+ if (listen(m_sock, 5) < 0) {
+ etiLog.level(error) << "Error listening on Stats Server socket: " <<
strerror(errno);
- close(accepted_sock);
- continue;
+ goto end_serverthread;
}
- if (strcmp(buffer, "config\n") == 0)
- {
- std::string json = getConfigJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else if (strcmp(buffer, "values\n") == 0)
- {
- std::string json = getValuesJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else
+ m_running = true;
+
+ while (m_running)
{
- int len = snprintf(buffer, 256, "Invalid command\n");
+ socklen_t cli_addr_len = sizeof(cli_addr);
+
+ /* Accept actual connection from the client */
+ accepted_sock = accept(m_sock,
+ (struct sockaddr *)&cli_addr,
+ &cli_addr_len);
- n = write(accepted_sock, buffer, len);
+ if (accepted_sock < 0) {
+ etiLog.level(warn) << "Stats Server cound not accept connection: " <<
+ strerror(errno);
+ continue;
+ }
+ /* Send welcome message with version */
+ n = write(accepted_sock, welcome_msg, welcome_msg_len);
if (n < 0) {
etiLog.level(warn) << "Error writing to Stats Server socket " <<
strerror(errno);
+ close(accepted_sock);
+ continue;
+ }
+
+ /* receive command */
+ memset(buffer, 0, 256);
+ int n = read(accepted_sock, buffer, 255);
+ if (n < 0) {
+ etiLog.level(warn) << "Error reading from Stats Server socket " <<
+ strerror(errno);
+ close(accepted_sock);
+ continue;
+ }
+
+ if (strcmp(buffer, "config\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getConfigJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
+ }
+ else if (strcmp(buffer, "values\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getValuesJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
+ }
+ else if (strcmp(buffer, "state\n") == 0)
+ {
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::string json = getStateJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
}
+ else
+ {
+ int len = snprintf(buffer, 256, "Invalid command\n");
+
+ n = write(accepted_sock, buffer, len);
+ if (n < 0) {
+ etiLog.level(warn) << "Error writing to Stats Server socket " <<
+ strerror(errno);
+ }
+ close(accepted_sock);
+ continue;
+ }
+
close(accepted_sock);
- continue;
}
- close(accepted_sock);
- }
-
end_serverthread:
- close(m_sock);
+ m_fault = true;
+ close(m_sock);
+
+ }
+ catch (std::exception& e) {
+ etiLog.level(error) << "Statistics server caught exception: " << e.what();
+ m_fault = true;
+ }
}
-std::string InputStat::encodeJSON()
+std::string InputStat::encodeValuesJSON()
{
std::ostringstream ss;
@@ -295,3 +347,62 @@ std::string InputStat::encodeJSON()
return ss.str();
}
+std::string InputStat::encodeStateJSON()
+{
+ std::ostringstream ss;
+
+ ss << "{ \"state\" : ";
+
+ switch (determineState()) {
+ case NoData:
+ ss << "\"NoData\"";
+ break;
+ case Unstable:
+ ss << "\"Unstable\"";
+ break;
+ case Streaming:
+ ss << "\"Streaming\"";
+ break;
+ default:
+ ss << "\"Unknown\"";
+ }
+
+ ss << " }";
+
+ return ss.str();
+}
+
+input_state_t InputStat::determineState(void)
+{
+ time_t now = time(NULL);
+ input_state_t state;
+
+ /* if the last event was more that INPUT_COUNTER_RESET_TIME
+ * minutes ago, the timeout has expired. We can reset our
+ * glitch counter.
+ */
+ if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) {
+ m_glitch_counter = 0;
+ }
+
+ // STATE CALCULATION
+
+ /* If the buffer has been empty for more than
+ * INPUT_NODATA_TIMEOUT, we go to the NoData state.
+ */
+ if (m_buffer_empty &&
+ now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) {
+ state = NoData;
+ }
+
+ /* Otherwise, the state depends on the glitch counter */
+ else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) {
+ state = Unstable;
+ }
+ else {
+ state = Streaming;
+ }
+
+ return state;
+}
+