aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-01-17 15:25:23 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-01-17 15:25:23 +0100
commitedb874717f758475d8fa1b433b71b601d1774e84 (patch)
treec5bd1098e51bdae39d0bad972f64c5ff066ec379
parent0ec9156736c97ac2e6c5b20035634e17ea350c5e (diff)
downloaddabmux-edb874717f758475d8fa1b433b71b601d1774e84.tar.gz
dabmux-edb874717f758475d8fa1b433b71b601d1774e84.tar.bz2
dabmux-edb874717f758475d8fa1b433b71b601d1774e84.zip
add code for TCP statistics server
-rw-r--r--src/StatsServer.cpp268
-rw-r--r--src/StatsServer.h130
-rw-r--r--src/TestStatsServer.cpp26
-rwxr-xr-xsrc/test_statsserver.sh1
4 files changed, 425 insertions, 0 deletions
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp
new file mode 100644
index 0000000..1d2984e
--- /dev/null
+++ b/src/StatsServer.cpp
@@ -0,0 +1,268 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2014 Matthias P. Braendli
+ http://mpb.li
+
+ A TCP Socket server that serves statistics for monitoring purposes.
+
+ This server is very easy to integrate with munin http://munin-monitoring.org/
+ */
+/*
+ This file is part of CRC-DabMux.
+
+ CRC-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ CRC-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <errno.h>
+#include <string.h>
+#include <sstream>
+#include "StatsServer.h"
+#include "Log.h"
+
+void StatsServer::registerInput(std::string id)
+{
+ boost::mutex::scoped_lock lock(m_mutex);
+
+ if (m_inputStats.count(id) == 1) {
+ etiLog.level(error) <<
+ "Double registration in Stats Server with id '" <<
+ id << "'";
+ return;
+ }
+
+ InputStat is;
+ m_inputStats[id] = is;
+}
+
+void StatsServer::notifyBuffer(std::string id, long bufsize)
+{
+ boost::mutex::scoped_lock lock(m_mutex);
+
+ if (m_inputStats.count(id) == 0) {
+ etiLog.level(error) <<
+ "Stats Server id '" <<
+ id << "' does was not registered";
+ return;
+ }
+
+ InputStat& is = m_inputStats[id];
+ if (bufsize > is.max_fill_buffer) {
+ is.max_fill_buffer = bufsize;
+ }
+
+ if (bufsize < is.min_fill_buffer ||
+ is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) {
+ is.min_fill_buffer = bufsize;
+ }
+}
+
+void StatsServer::notifyUnderrun(std::string id)
+{
+ boost::mutex::scoped_lock lock(m_mutex);
+
+ if (m_inputStats.count(id) == 0) {
+ etiLog.level(error) <<
+ "Stats Server id '" <<
+ id << "' does was not registered";
+ return;
+ }
+
+ InputStat& is = m_inputStats[id];
+ is.num_underruns++;
+}
+
+void StatsServer::notifyOverrun(std::string id)
+{
+ boost::mutex::scoped_lock lock(m_mutex);
+
+ if (m_inputStats.count(id) == 0) {
+ etiLog.level(error) <<
+ "Stats Server id '" <<
+ id << "' does was not registered";
+ return;
+ }
+
+ InputStat& is = m_inputStats[id];
+ is.num_overruns++;
+}
+
+std::string StatsServer::getConfigJSON()
+{
+ std::ostringstream ss;
+ ss << "{ \"config\" : [\n";
+
+ std::map<std::string,InputStat>::iterator iter;
+ bool first = true;
+ for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter)
+ {
+ std::string id = iter->first;
+
+ if (! first) {
+ ss << ", ";
+ first = false;
+ }
+
+ ss << " \"" << id << "\" ";
+ }
+
+ ss << "] }\n";
+
+ return ss.str();
+}
+
+std::string StatsServer::getValuesJSON()
+{
+ std::ostringstream ss;
+ ss << "{ \"values\" : {\n";
+
+ std::map<std::string,InputStat>::iterator iter;
+ bool first = true;
+ for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter)
+ {
+ const std::string& id = iter->first;
+ InputStat& stats = iter->second;
+
+ if (! first) {
+ ss << " ,\n";
+ first = false;
+ }
+
+ ss << " \"" << id << "\" : ";
+ ss << stats.encodeJSON();
+ stats.reset();
+ }
+
+ ss << "}\n}\n";
+
+ return ss.str();
+}
+void StatsServer::serverThread()
+{
+ int sock, accepted_sock;
+ char buffer[256];
+ char welcome_msg[256];
+ struct sockaddr_in serv_addr, cli_addr;
+ int n;
+
+#ifndef GITVERSION
+#define GITVERSION "-r???"
+#endif
+ int welcome_msg_len = snprintf(welcome_msg, 256,
+ "{ \"service\": \""
+ "%s %s%s Stats Server\" }\n",
+ PACKAGE_NAME, PACKAGE_VERSION, GITVERSION);
+
+
+ sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) {
+ etiLog.level(error) << "Error opening Stats Server socket: " <<
+ strerror(errno);
+ return;
+ }
+
+ memset(&serv_addr, 0, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
+ serv_addr.sin_port = htons(m_listenport);
+
+ if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
+ etiLog.level(error) << "Error binding Stats Server socket: " <<
+ strerror(errno);
+ goto end_serverthread;
+ }
+
+ if (listen(sock, 5) < 0) {
+ etiLog.level(error) << "Error listening on Stats Server socket: " <<
+ strerror(errno);
+ goto end_serverthread;
+ }
+
+ while (m_running)
+ {
+ socklen_t cli_addr_len = sizeof(cli_addr);
+
+ /* Accept actual connection from the client */
+ accepted_sock = accept(sock, (struct sockaddr *)&cli_addr, &cli_addr_len);
+ if (accepted_sock < 0) {
+ etiLog.level(warn) << "Stats Server cound not accept connection: " <<
+ strerror(errno);
+ continue;
+ }
+ /* Send welcome message with version */
+ n = write(accepted_sock, welcome_msg, welcome_msg_len);
+ if (n < 0) {
+ etiLog.level(warn) << "Error writing to Stats Server socket " <<
+ strerror(errno);
+ close(accepted_sock);
+ continue;
+ }
+
+ /* receive command */
+ memset(buffer, 256, 0);
+ int n = read(accepted_sock, buffer, 255);
+ if (n < 0) {
+ etiLog.level(warn) << "Error reading from Stats Server socket " <<
+ strerror(errno);
+ close(accepted_sock);
+ continue;
+ }
+
+ if (strcmp(buffer, "config\n") == 0)
+ {
+ std::string json = getConfigJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
+ }
+ else if (strcmp(buffer, "values\n") == 0)
+ {
+ std::string json = getValuesJSON();
+ n = write(accepted_sock, json.c_str(), json.size());
+ }
+ else
+ {
+ int len = snprintf(buffer, 256, "Invalid command\n");
+
+ n = write(accepted_sock, buffer, len);
+ if (n < 0) {
+ etiLog.level(warn) << "Error writing to Stats Server socket " <<
+ strerror(errno);
+ }
+ close(accepted_sock);
+ continue;
+ }
+
+ close(accepted_sock);
+ }
+
+end_serverthread:
+ close(sock);
+}
+
+
+std::string InputStat::encodeJSON()
+{
+ std::ostringstream ss;
+
+ ss <<
+ "{ \"inputstat\" : {"
+ "\"min_fill\": " << min_fill_buffer << ", "
+ "\"max_fill\": " << max_fill_buffer << ", "
+ "\"num_underruns\": " << num_underruns << ", "
+ "\"num_overruns\": " << num_overruns <<
+ " } }";
+
+ return ss.str();
+}
+
diff --git a/src/StatsServer.h b/src/StatsServer.h
new file mode 100644
index 0000000..76ca497
--- /dev/null
+++ b/src/StatsServer.h
@@ -0,0 +1,130 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2014 Matthias P. Braendli
+ http://mpb.li
+
+ A TCP Socket server that serves statistics for monitoring purposes.
+
+ This server is very easy to integrate with munin
+ http://munin-monitoring.org/
+ but is not specific to it.
+
+ The TCP Server responds in JSON, and accepts two commands:
+ - config
+ -and-
+ - values
+
+ */
+/*
+ This file is part of CRC-DabMux.
+
+ CRC-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ CRC-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __STATS_SERVER_H
+#define __STATS_SERVER_H
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#include <string>
+#include <map>
+#include <boost/thread.hpp>
+
+#define MIN_FILL_BUFFER_UNDEF (-1)
+
+struct InputStat
+{
+ InputStat() { reset(); }
+
+ // minimum and maximum buffer fill since last reset
+ long min_fill_buffer;
+ long max_fill_buffer;
+
+ // number of overruns and underruns since last reset
+ long num_underruns;
+ long num_overruns;
+
+ void reset()
+ {
+ min_fill_buffer = MIN_FILL_BUFFER_UNDEF;
+ max_fill_buffer = 0;
+
+ num_underruns = 0;
+ num_overruns = 0;
+ }
+
+ std::string encodeJSON();
+};
+
+class StatsServer
+{
+ public:
+ StatsServer(int listen_port) :
+ m_listenport(listen_port),
+ m_running(true),
+ m_thread(&StatsServer::serverThread, this)
+ {}
+
+ void registerInput(std::string id);
+ // The input notifies the StatsServer about a new buffer size
+ void notifyBuffer(std::string id, long bufsize);
+
+ void notifyUnderrun(std::string id);
+ void notifyOverrun(std::string id);
+
+ private:
+ /******* TCP Socket Server ******/
+ // no copying (because of the thread)
+ StatsServer(const StatsServer& other);
+
+ void serverThread();
+
+ int m_listenport;
+
+ // serverThread runs in a separate thread
+ bool m_running;
+ boost::thread m_thread;
+
+ /******* Statistics Data ********/
+ std::map<std::string, InputStat> m_inputStats;
+
+ /* Return a description of the configuration that will
+ * allow to define what graphs to be created
+ *
+ * returns: a JSON encoded configuration
+ */
+ std::string getConfigJSON();
+
+ /* Return the values for the statistics as defined in the configuration
+ *
+ * returns: JSON encoded statistics
+ */
+ std::string getValuesJSON();
+
+ // mutex for accessing the map
+ mutable boost::mutex m_mutex;
+};
+
+#endif
+
diff --git a/src/TestStatsServer.cpp b/src/TestStatsServer.cpp
new file mode 100644
index 0000000..7ec0342
--- /dev/null
+++ b/src/TestStatsServer.cpp
@@ -0,0 +1,26 @@
+#include <stdio.h>
+#include "StatsServer.h"
+
+#define NUMOF(x) (sizeof(x) / sizeof(*x))
+
+int main(int argc, char **argv)
+{
+ int stats_example[] = {25, 24, 22, 21, 56, 56, 54, 53, 51, 45, 42, 39, 34, 30, 24, 15, 8, 4, 1, 0};
+ StatsServer serv(2720);
+
+ serv.registerInput("foo");
+ while (true) {
+ for (int i = 0; i < NUMOF(stats_example); i++) {
+ usleep(400000);
+ serv.notifyBuffer("foo", stats_example[i]);
+ fprintf(stderr, "give %d\n", stats_example[i]);
+
+ if (stats_example[i] == 0) {
+ serv.notifyUnderrun("foo");
+ }
+ }
+ }
+
+ return 0;
+}
+
diff --git a/src/test_statsserver.sh b/src/test_statsserver.sh
new file mode 100755
index 0000000..b240cf9
--- /dev/null
+++ b/src/test_statsserver.sh
@@ -0,0 +1 @@
+clang++ -Wall --include=../config.h StatsServer.cpp TestStatsServer.cpp Log.cpp -lboost_system -lboost_thread -o test && ./test