diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-01-17 15:25:23 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-01-17 15:25:23 +0100 |
commit | edb874717f758475d8fa1b433b71b601d1774e84 (patch) | |
tree | c5bd1098e51bdae39d0bad972f64c5ff066ec379 | |
parent | 0ec9156736c97ac2e6c5b20035634e17ea350c5e (diff) | |
download | dabmux-edb874717f758475d8fa1b433b71b601d1774e84.tar.gz dabmux-edb874717f758475d8fa1b433b71b601d1774e84.tar.bz2 dabmux-edb874717f758475d8fa1b433b71b601d1774e84.zip |
add code for TCP statistics server
-rw-r--r-- | src/StatsServer.cpp | 268 | ||||
-rw-r--r-- | src/StatsServer.h | 130 | ||||
-rw-r--r-- | src/TestStatsServer.cpp | 26 | ||||
-rwxr-xr-x | src/test_statsserver.sh | 1 |
4 files changed, 425 insertions, 0 deletions
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp new file mode 100644 index 0000000..1d2984e --- /dev/null +++ b/src/StatsServer.cpp @@ -0,0 +1,268 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2014 Matthias P. Braendli + http://mpb.li + + A TCP Socket server that serves statistics for monitoring purposes. + + This server is very easy to integrate with munin http://munin-monitoring.org/ + */ +/* + This file is part of CRC-DabMux. + + CRC-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <errno.h> +#include <string.h> +#include <sstream> +#include "StatsServer.h" +#include "Log.h" + +void StatsServer::registerInput(std::string id) +{ + boost::mutex::scoped_lock lock(m_mutex); + + if (m_inputStats.count(id) == 1) { + etiLog.level(error) << + "Double registration in Stats Server with id '" << + id << "'"; + return; + } + + InputStat is; + m_inputStats[id] = is; +} + +void StatsServer::notifyBuffer(std::string id, long bufsize) +{ + boost::mutex::scoped_lock lock(m_mutex); + + if (m_inputStats.count(id) == 0) { + etiLog.level(error) << + "Stats Server id '" << + id << "' does was not registered"; + return; + } + + InputStat& is = m_inputStats[id]; + if (bufsize > is.max_fill_buffer) { + is.max_fill_buffer = bufsize; + } + + if (bufsize < is.min_fill_buffer || + is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { + is.min_fill_buffer = bufsize; + } +} + +void StatsServer::notifyUnderrun(std::string id) +{ + boost::mutex::scoped_lock lock(m_mutex); + + if (m_inputStats.count(id) == 0) { + etiLog.level(error) << + "Stats Server id '" << + id << "' does was not registered"; + return; + } + + InputStat& is = m_inputStats[id]; + is.num_underruns++; +} + +void StatsServer::notifyOverrun(std::string id) +{ + boost::mutex::scoped_lock lock(m_mutex); + + if (m_inputStats.count(id) == 0) { + etiLog.level(error) << + "Stats Server id '" << + id << "' does was not registered"; + return; + } + + InputStat& is = m_inputStats[id]; + is.num_overruns++; +} + +std::string StatsServer::getConfigJSON() +{ + std::ostringstream ss; + ss << "{ \"config\" : [\n"; + + std::map<std::string,InputStat>::iterator iter; + bool first = true; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter) + { + std::string id = iter->first; + + if (! first) { + ss << ", "; + first = false; + } + + ss << " \"" << id << "\" "; + } + + ss << "] }\n"; + + return ss.str(); +} + +std::string StatsServer::getValuesJSON() +{ + std::ostringstream ss; + ss << "{ \"values\" : {\n"; + + std::map<std::string,InputStat>::iterator iter; + bool first = true; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter) + { + const std::string& id = iter->first; + InputStat& stats = iter->second; + + if (! first) { + ss << " ,\n"; + first = false; + } + + ss << " \"" << id << "\" : "; + ss << stats.encodeJSON(); + stats.reset(); + } + + ss << "}\n}\n"; + + return ss.str(); +} +void StatsServer::serverThread() +{ + int sock, accepted_sock; + char buffer[256]; + char welcome_msg[256]; + struct sockaddr_in serv_addr, cli_addr; + int n; + +#ifndef GITVERSION +#define GITVERSION "-r???" +#endif + int welcome_msg_len = snprintf(welcome_msg, 256, + "{ \"service\": \"" + "%s %s%s Stats Server\" }\n", + PACKAGE_NAME, PACKAGE_VERSION, GITVERSION); + + + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + etiLog.level(error) << "Error opening Stats Server socket: " << + strerror(errno); + return; + } + + memset(&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 + serv_addr.sin_port = htons(m_listenport); + + if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + etiLog.level(error) << "Error binding Stats Server socket: " << + strerror(errno); + goto end_serverthread; + } + + if (listen(sock, 5) < 0) { + etiLog.level(error) << "Error listening on Stats Server socket: " << + strerror(errno); + goto end_serverthread; + } + + while (m_running) + { + socklen_t cli_addr_len = sizeof(cli_addr); + + /* Accept actual connection from the client */ + accepted_sock = accept(sock, (struct sockaddr *)&cli_addr, &cli_addr_len); + if (accepted_sock < 0) { + etiLog.level(warn) << "Stats Server cound not accept connection: " << + strerror(errno); + continue; + } + /* Send welcome message with version */ + n = write(accepted_sock, welcome_msg, welcome_msg_len); + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + /* receive command */ + memset(buffer, 256, 0); + int n = read(accepted_sock, buffer, 255); + if (n < 0) { + etiLog.level(warn) << "Error reading from Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + if (strcmp(buffer, "config\n") == 0) + { + std::string json = getConfigJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "values\n") == 0) + { + std::string json = getValuesJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else + { + int len = snprintf(buffer, 256, "Invalid command\n"); + + n = write(accepted_sock, buffer, len); + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + } + close(accepted_sock); + continue; + } + + close(accepted_sock); + } + +end_serverthread: + close(sock); +} + + +std::string InputStat::encodeJSON() +{ + std::ostringstream ss; + + ss << + "{ \"inputstat\" : {" + "\"min_fill\": " << min_fill_buffer << ", " + "\"max_fill\": " << max_fill_buffer << ", " + "\"num_underruns\": " << num_underruns << ", " + "\"num_overruns\": " << num_overruns << + " } }"; + + return ss.str(); +} + diff --git a/src/StatsServer.h b/src/StatsServer.h new file mode 100644 index 0000000..76ca497 --- /dev/null +++ b/src/StatsServer.h @@ -0,0 +1,130 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2014 Matthias P. Braendli + http://mpb.li + + A TCP Socket server that serves statistics for monitoring purposes. + + This server is very easy to integrate with munin + http://munin-monitoring.org/ + but is not specific to it. + + The TCP Server responds in JSON, and accepts two commands: + - config + -and- + - values + + */ +/* + This file is part of CRC-DabMux. + + CRC-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __STATS_SERVER_H +#define __STATS_SERVER_H + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <pthread.h> +#include <string> +#include <map> +#include <boost/thread.hpp> + +#define MIN_FILL_BUFFER_UNDEF (-1) + +struct InputStat +{ + InputStat() { reset(); } + + // minimum and maximum buffer fill since last reset + long min_fill_buffer; + long max_fill_buffer; + + // number of overruns and underruns since last reset + long num_underruns; + long num_overruns; + + void reset() + { + min_fill_buffer = MIN_FILL_BUFFER_UNDEF; + max_fill_buffer = 0; + + num_underruns = 0; + num_overruns = 0; + } + + std::string encodeJSON(); +}; + +class StatsServer +{ + public: + StatsServer(int listen_port) : + m_listenport(listen_port), + m_running(true), + m_thread(&StatsServer::serverThread, this) + {} + + void registerInput(std::string id); + // The input notifies the StatsServer about a new buffer size + void notifyBuffer(std::string id, long bufsize); + + void notifyUnderrun(std::string id); + void notifyOverrun(std::string id); + + private: + /******* TCP Socket Server ******/ + // no copying (because of the thread) + StatsServer(const StatsServer& other); + + void serverThread(); + + int m_listenport; + + // serverThread runs in a separate thread + bool m_running; + boost::thread m_thread; + + /******* Statistics Data ********/ + std::map<std::string, InputStat> m_inputStats; + + /* Return a description of the configuration that will + * allow to define what graphs to be created + * + * returns: a JSON encoded configuration + */ + std::string getConfigJSON(); + + /* Return the values for the statistics as defined in the configuration + * + * returns: JSON encoded statistics + */ + std::string getValuesJSON(); + + // mutex for accessing the map + mutable boost::mutex m_mutex; +}; + +#endif + diff --git a/src/TestStatsServer.cpp b/src/TestStatsServer.cpp new file mode 100644 index 0000000..7ec0342 --- /dev/null +++ b/src/TestStatsServer.cpp @@ -0,0 +1,26 @@ +#include <stdio.h> +#include "StatsServer.h" + +#define NUMOF(x) (sizeof(x) / sizeof(*x)) + +int main(int argc, char **argv) +{ + int stats_example[] = {25, 24, 22, 21, 56, 56, 54, 53, 51, 45, 42, 39, 34, 30, 24, 15, 8, 4, 1, 0}; + StatsServer serv(2720); + + serv.registerInput("foo"); + while (true) { + for (int i = 0; i < NUMOF(stats_example); i++) { + usleep(400000); + serv.notifyBuffer("foo", stats_example[i]); + fprintf(stderr, "give %d\n", stats_example[i]); + + if (stats_example[i] == 0) { + serv.notifyUnderrun("foo"); + } + } + } + + return 0; +} + diff --git a/src/test_statsserver.sh b/src/test_statsserver.sh new file mode 100755 index 0000000..b240cf9 --- /dev/null +++ b/src/test_statsserver.sh @@ -0,0 +1 @@ +clang++ -Wall --include=../config.h StatsServer.cpp TestStatsServer.cpp Log.cpp -lboost_system -lboost_thread -o test && ./test |