diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-01-17 15:25:23 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-01-17 15:25:23 +0100 | 
| commit | edb874717f758475d8fa1b433b71b601d1774e84 (patch) | |
| tree | c5bd1098e51bdae39d0bad972f64c5ff066ec379 | |
| parent | 0ec9156736c97ac2e6c5b20035634e17ea350c5e (diff) | |
| download | dabmux-edb874717f758475d8fa1b433b71b601d1774e84.tar.gz dabmux-edb874717f758475d8fa1b433b71b601d1774e84.tar.bz2 dabmux-edb874717f758475d8fa1b433b71b601d1774e84.zip  | |
add code for TCP statistics server
| -rw-r--r-- | src/StatsServer.cpp | 268 | ||||
| -rw-r--r-- | src/StatsServer.h | 130 | ||||
| -rw-r--r-- | src/TestStatsServer.cpp | 26 | ||||
| -rwxr-xr-x | src/test_statsserver.sh | 1 | 
4 files changed, 425 insertions, 0 deletions
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp new file mode 100644 index 0000000..1d2984e --- /dev/null +++ b/src/StatsServer.cpp @@ -0,0 +1,268 @@ +/* +   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2014 Matthias P. Braendli +   http://mpb.li + +   A TCP Socket server that serves statistics for monitoring purposes. + +   This server is very easy to integrate with munin http://munin-monitoring.org/ +   */ +/* +   This file is part of CRC-DabMux. + +   CRC-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   CRC-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with CRC-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#include <errno.h> +#include <string.h> +#include <sstream> +#include "StatsServer.h" +#include "Log.h" + +void StatsServer::registerInput(std::string id) +{ +    boost::mutex::scoped_lock lock(m_mutex); + +    if (m_inputStats.count(id) == 1) { +        etiLog.level(error) << +            "Double registration in Stats Server with id '" << +            id << "'"; +        return; +    } + +    InputStat is; +    m_inputStats[id] = is; +} + +void StatsServer::notifyBuffer(std::string id, long bufsize) +{ +    boost::mutex::scoped_lock lock(m_mutex); + +    if (m_inputStats.count(id) == 0) { +        etiLog.level(error) << +            "Stats Server id '" << +            id << "' does was not registered"; +        return; +    } + +    InputStat& is = m_inputStats[id]; +    if (bufsize > is.max_fill_buffer) { +        is.max_fill_buffer = bufsize; +    } + +    if (bufsize < is.min_fill_buffer || +            is.min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { +        is.min_fill_buffer = bufsize; +    } +} + +void StatsServer::notifyUnderrun(std::string id) +{ +    boost::mutex::scoped_lock lock(m_mutex); + +    if (m_inputStats.count(id) == 0) { +        etiLog.level(error) << +            "Stats Server id '" << +            id << "' does was not registered"; +        return; +    } + +    InputStat& is = m_inputStats[id]; +    is.num_underruns++; +} + +void StatsServer::notifyOverrun(std::string id) +{ +    boost::mutex::scoped_lock lock(m_mutex); + +    if (m_inputStats.count(id) == 0) { +        etiLog.level(error) << +            "Stats Server id '" << +            id << "' does was not registered"; +        return; +    } + +    InputStat& is = m_inputStats[id]; +    is.num_overruns++; +} + +std::string StatsServer::getConfigJSON() +{ +    std::ostringstream ss; +    ss << "{ \"config\" : [\n"; + +    std::map<std::string,InputStat>::iterator iter; +    bool first = true; +    for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter) +    { +        std::string id = iter->first; + +        if (! first) { +            ss << ", "; +            first = false; +        } + +        ss << " \"" << id << "\" "; +    } + +    ss << "] }\n"; + +    return ss.str(); +} + +std::string StatsServer::getValuesJSON() +{ +    std::ostringstream ss; +    ss << "{ \"values\" : {\n"; + +    std::map<std::string,InputStat>::iterator iter; +    bool first = true; +    for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter) +    { +        const std::string& id = iter->first; +        InputStat& stats = iter->second; + +        if (! first) { +            ss << " ,\n"; +            first = false; +        } + +        ss << " \"" << id << "\" : "; +        ss << stats.encodeJSON(); +        stats.reset(); +    } + +    ss << "}\n}\n"; + +    return ss.str(); +} +void StatsServer::serverThread() +{ +    int sock, accepted_sock; +    char buffer[256]; +    char welcome_msg[256]; +    struct sockaddr_in serv_addr, cli_addr; +    int n; + +#ifndef GITVERSION +#define GITVERSION "-r???" +#endif +    int welcome_msg_len = snprintf(welcome_msg, 256, +            "{ \"service\": \"" +            "%s %s%s Stats Server\" }\n", +            PACKAGE_NAME, PACKAGE_VERSION, GITVERSION); + + +    sock = socket(AF_INET, SOCK_STREAM, 0); +    if (sock < 0) { +        etiLog.level(error) << "Error opening Stats Server socket: " << +            strerror(errno); +        return; +    } + +    memset(&serv_addr, 0, sizeof(serv_addr)); +    serv_addr.sin_family = AF_INET; +    serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 +    serv_addr.sin_port = htons(m_listenport); + +    if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { +        etiLog.level(error) << "Error binding Stats Server socket: " << +            strerror(errno); +        goto end_serverthread; +    } + +    if (listen(sock, 5) < 0) { +        etiLog.level(error) << "Error listening on Stats Server socket: " << +            strerror(errno); +        goto end_serverthread; +    } + +    while (m_running) +    { +        socklen_t cli_addr_len = sizeof(cli_addr); + +        /* Accept actual connection from the client */ +        accepted_sock = accept(sock, (struct sockaddr *)&cli_addr, &cli_addr_len); +        if (accepted_sock < 0) { +            etiLog.level(warn) << "Stats Server cound not accept connection: " << +                strerror(errno); +            continue; +        } +        /* Send welcome message with version */ +        n = write(accepted_sock, welcome_msg, welcome_msg_len); +        if (n < 0) { +            etiLog.level(warn) << "Error writing to Stats Server socket " << +                strerror(errno); +            close(accepted_sock); +            continue; +        } + +        /* receive command */ +        memset(buffer, 256, 0); +        int n = read(accepted_sock, buffer, 255); +        if (n < 0) { +            etiLog.level(warn) << "Error reading from Stats Server socket " << +                strerror(errno); +            close(accepted_sock); +            continue; +        } + +        if (strcmp(buffer, "config\n") == 0) +        { +            std::string json = getConfigJSON(); +            n = write(accepted_sock, json.c_str(), json.size()); +        } +        else if (strcmp(buffer, "values\n") == 0) +        { +            std::string json = getValuesJSON(); +            n = write(accepted_sock, json.c_str(), json.size()); +        } +        else +        { +            int len = snprintf(buffer, 256, "Invalid command\n"); + +            n = write(accepted_sock, buffer, len); +            if (n < 0) { +                etiLog.level(warn) << "Error writing to Stats Server socket " << +                    strerror(errno); +            } +            close(accepted_sock); +            continue; +        } + +        close(accepted_sock); +    } + +end_serverthread: +    close(sock); +} + + +std::string InputStat::encodeJSON() +{ +    std::ostringstream ss; + +    ss << +    "{ \"inputstat\" : {" +        "\"min_fill\": " << min_fill_buffer << ", " +        "\"max_fill\": " << max_fill_buffer << ", " +        "\"num_underruns\": " << num_underruns << ", " +        "\"num_overruns\": " << num_overruns << +    " } }"; + +    return ss.str(); +} + diff --git a/src/StatsServer.h b/src/StatsServer.h new file mode 100644 index 0000000..76ca497 --- /dev/null +++ b/src/StatsServer.h @@ -0,0 +1,130 @@ +/* +   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2014 Matthias P. Braendli +   http://mpb.li + +   A TCP Socket server that serves statistics for monitoring purposes. + +   This server is very easy to integrate with munin +        http://munin-monitoring.org/ +   but is not specific to it. + +   The TCP Server responds in JSON, and accepts two commands: +    - config +       -and- +    - values + +   */ +/* +   This file is part of CRC-DabMux. + +   CRC-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   CRC-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with CRC-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#ifndef __STATS_SERVER_H +#define __STATS_SERVER_H + +#ifdef HAVE_CONFIG_H +#   include "config.h" +#endif + +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> +#include <pthread.h> +#include <string> +#include <map> +#include <boost/thread.hpp> + +#define MIN_FILL_BUFFER_UNDEF (-1) + +struct InputStat +{ +    InputStat() { reset(); } + +    // minimum and maximum buffer fill since last reset +    long min_fill_buffer; +    long max_fill_buffer; + +    // number of overruns and underruns since last reset +    long num_underruns; +    long num_overruns; + +    void reset() +    { +        min_fill_buffer = MIN_FILL_BUFFER_UNDEF; +        max_fill_buffer = 0; + +        num_underruns = 0; +        num_overruns = 0; +    } + +    std::string encodeJSON(); +}; + +class StatsServer +{ +    public: +        StatsServer(int listen_port) : +            m_listenport(listen_port), +            m_running(true), +            m_thread(&StatsServer::serverThread, this) +            {} + +        void registerInput(std::string id); +        // The input notifies the StatsServer about a new buffer size +        void notifyBuffer(std::string id, long bufsize); + +        void notifyUnderrun(std::string id); +        void notifyOverrun(std::string id); + +    private: +        /******* TCP Socket Server ******/ +        // no copying (because of the thread) +        StatsServer(const StatsServer& other); + +        void serverThread(); + +        int m_listenport; + +        // serverThread runs in a separate thread +        bool m_running; +        boost::thread m_thread; + +        /******* Statistics Data ********/ +        std::map<std::string, InputStat> m_inputStats; + +        /* Return a description of the configuration that will +         * allow to define what graphs to be created +         * +         * returns: a JSON encoded configuration +         */ +        std::string getConfigJSON(); + +        /* Return the values for the statistics as defined in the configuration +         * +         * returns: JSON encoded statistics +         */ +        std::string getValuesJSON(); + +        // mutex for accessing the map +        mutable boost::mutex m_mutex; +}; + +#endif + diff --git a/src/TestStatsServer.cpp b/src/TestStatsServer.cpp new file mode 100644 index 0000000..7ec0342 --- /dev/null +++ b/src/TestStatsServer.cpp @@ -0,0 +1,26 @@ +#include <stdio.h> +#include "StatsServer.h" + +#define NUMOF(x) (sizeof(x) / sizeof(*x)) + +int main(int argc, char **argv) +{ +    int stats_example[] = {25, 24, 22, 21, 56, 56, 54, 53, 51, 45, 42, 39, 34, 30, 24, 15, 8, 4, 1, 0}; +    StatsServer serv(2720); + +    serv.registerInput("foo"); +    while (true) { +        for (int i = 0; i < NUMOF(stats_example); i++) { +            usleep(400000); +            serv.notifyBuffer("foo", stats_example[i]); +            fprintf(stderr, "give %d\n", stats_example[i]); + +            if (stats_example[i] == 0) { +                serv.notifyUnderrun("foo"); +            } +        } +    } + +    return 0; +} + diff --git a/src/test_statsserver.sh b/src/test_statsserver.sh new file mode 100755 index 0000000..b240cf9 --- /dev/null +++ b/src/test_statsserver.sh @@ -0,0 +1 @@ +clang++ -Wall --include=../config.h  StatsServer.cpp TestStatsServer.cpp Log.cpp -lboost_system -lboost_thread -o test && ./test  | 
