From edb874717f758475d8fa1b433b71b601d1774e84 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 17 Jan 2014 15:25:23 +0100 Subject: add code for TCP statistics server --- src/StatsServer.cpp | 268 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 src/StatsServer.cpp (limited to 'src/StatsServer.cpp') diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp new file mode 100644 index 0000000..1d2984e --- /dev/null +++ b/src/StatsServer.cpp @@ -0,0 +1,268 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2014 Matthias P. Braendli + http://mpb.li + + A TCP Socket server that serves statistics for monitoring purposes. + + This server is very easy to integrate with munin http://munin-monitoring.org/ + */ +/* + This file is part of CRC-DabMux. + + CRC-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DabMux. If not, see . + */ + +#include +#include +#include +#include "StatsServer.h" +#include "Log.h" + +void StatsServer::registerInput(std::string id) +{ + boost::mutex::scoped_lock lock(m_mutex); + + if (m_inputStats.count(id) == 1) { + etiLog.level(error) << + "Double registration in Stats Server with id '" << + id << "'"; + return; + } + + InputStat is; + m_inputStats[id] = is; +} + +void StatsServer::notifyBuffer(std::string id, long bufsize) +{ + boost::mutex::scoped_lock lock(m_mutex); + + if (m_inputStats.count(id) == 0) { + etiLog.level(error) << + "Stats Server id '" << + id << "' does was not registered"; + return; + } + + InputStat& is = m_inputStats[id]; + if (bufsize > is.max_fill_buffer) { + is.max_fill_buffer = bufsize; + } + + if (bufsize < is.min_fill_buffer || + is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { + is.min_fill_buffer = bufsize; + } +} + +void StatsServer::notifyUnderrun(std::string id) +{ + boost::mutex::scoped_lock lock(m_mutex); + + if (m_inputStats.count(id) == 0) { + etiLog.level(error) << + "Stats Server id '" << + id << "' does was not registered"; + return; + } + + InputStat& is = m_inputStats[id]; + is.num_underruns++; +} + +void StatsServer::notifyOverrun(std::string id) +{ + boost::mutex::scoped_lock lock(m_mutex); + + if (m_inputStats.count(id) == 0) { + etiLog.level(error) << + "Stats Server id '" << + id << "' does was not registered"; + return; + } + + InputStat& is = m_inputStats[id]; + is.num_overruns++; +} + +std::string StatsServer::getConfigJSON() +{ + std::ostringstream ss; + ss << "{ \"config\" : [\n"; + + std::map::iterator iter; + bool first = true; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter) + { + std::string id = iter->first; + + if (! first) { + ss << ", "; + first = false; + } + + ss << " \"" << id << "\" "; + } + + ss << "] }\n"; + + return ss.str(); +} + +std::string StatsServer::getValuesJSON() +{ + std::ostringstream ss; + ss << "{ \"values\" : {\n"; + + std::map::iterator iter; + bool first = true; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter) + { + const std::string& id = iter->first; + InputStat& stats = iter->second; + + if (! first) { + ss << " ,\n"; + first = false; + } + + ss << " \"" << id << "\" : "; + ss << stats.encodeJSON(); + stats.reset(); + } + + ss << "}\n}\n"; + + return ss.str(); +} +void StatsServer::serverThread() +{ + int sock, accepted_sock; + char buffer[256]; + char welcome_msg[256]; + struct sockaddr_in serv_addr, cli_addr; + int n; + +#ifndef GITVERSION +#define GITVERSION "-r???" +#endif + int welcome_msg_len = snprintf(welcome_msg, 256, + "{ \"service\": \"" + "%s %s%s Stats Server\" }\n", + PACKAGE_NAME, PACKAGE_VERSION, GITVERSION); + + + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + etiLog.level(error) << "Error opening Stats Server socket: " << + strerror(errno); + return; + } + + memset(&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 + serv_addr.sin_port = htons(m_listenport); + + if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + etiLog.level(error) << "Error binding Stats Server socket: " << + strerror(errno); + goto end_serverthread; + } + + if (listen(sock, 5) < 0) { + etiLog.level(error) << "Error listening on Stats Server socket: " << + strerror(errno); + goto end_serverthread; + } + + while (m_running) + { + socklen_t cli_addr_len = sizeof(cli_addr); + + /* Accept actual connection from the client */ + accepted_sock = accept(sock, (struct sockaddr *)&cli_addr, &cli_addr_len); + if (accepted_sock < 0) { + etiLog.level(warn) << "Stats Server cound not accept connection: " << + strerror(errno); + continue; + } + /* Send welcome message with version */ + n = write(accepted_sock, welcome_msg, welcome_msg_len); + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + /* receive command */ + memset(buffer, 256, 0); + int n = read(accepted_sock, buffer, 255); + if (n < 0) { + etiLog.level(warn) << "Error reading from Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + if (strcmp(buffer, "config\n") == 0) + { + std::string json = getConfigJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "values\n") == 0) + { + std::string json = getValuesJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else + { + int len = snprintf(buffer, 256, "Invalid command\n"); + + n = write(accepted_sock, buffer, len); + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + } + close(accepted_sock); + continue; + } + + close(accepted_sock); + } + +end_serverthread: + close(sock); +} + + +std::string InputStat::encodeJSON() +{ + std::ostringstream ss; + + ss << + "{ \"inputstat\" : {" + "\"min_fill\": " << min_fill_buffer << ", " + "\"max_fill\": " << max_fill_buffer << ", " + "\"num_underruns\": " << num_underruns << ", " + "\"num_overruns\": " << num_overruns << + " } }"; + + return ss.str(); +} + -- cgit v1.2.3