diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-03-07 17:19:31 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-03-07 17:27:12 +0100 |
commit | 3f4214227e993305bb320b299245f466b9b22233 (patch) | |
tree | 0863975f29692d0939e3acc53b76a00e97f2aadc /src/StatsServer.cpp | |
parent | 0414d5788090bb6df728d370079e44e95b4ffd20 (diff) | |
download | dabmux-3f4214227e993305bb320b299245f466b9b22233.tar.gz dabmux-3f4214227e993305bb320b299245f466b9b22233.tar.bz2 dabmux-3f4214227e993305bb320b299245f466b9b22233.zip |
Merge Stats and Config server, config is also JSON
Diffstat (limited to 'src/StatsServer.cpp')
-rw-r--r-- | src/StatsServer.cpp | 409 |
1 files changed, 0 insertions, 409 deletions
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp deleted file mode 100644 index 558a479..0000000 --- a/src/StatsServer.cpp +++ /dev/null @@ -1,409 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2014 Matthias P. Braendli - http://mpb.li - - A TCP Socket server that serves state information and statistics for - monitoring purposes. - - This server is very easy to integrate with munin http://munin-monitoring.org/ - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#include <errno.h> -#include <string.h> -#include <math.h> -#include <stdint.h> -#include <limits> -#include <sstream> -#include <ctime> -#include <boost/thread.hpp> -#include "StatsServer.h" -#include "Log.h" - -void StatsServer::registerInput(InputStat* is) -{ - boost::mutex::scoped_lock lock(m_mutex); - - std::string id(is->get_name()); - - if (m_inputStats.count(id) == 1) { - etiLog.level(error) << - "Double registration in Stats Server with id '" << - id << "'"; - return; - } - - m_inputStats[id] = is; -} - -void StatsServer::unregisterInput(std::string id) -{ - boost::mutex::scoped_lock lock(m_mutex); - - if (m_inputStats.count(id) == 1) { - m_inputStats.erase(id); - } -} - - -bool StatsServer::isInputRegistered(std::string& id) -{ - boost::mutex::scoped_lock lock(m_mutex); - - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return false; - } - return true; -} - -std::string StatsServer::getConfigJSON() -{ - boost::mutex::scoped_lock lock(m_mutex); - - std::ostringstream ss; - ss << "{ \"config\" : [\n"; - - std::map<std::string,InputStat*>::iterator iter; - int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); - ++iter, i++) - { - std::string id = iter->first; - - if (i > 0) { - ss << ", "; - } - - ss << " \"" << id << "\" "; - } - - ss << "] }\n"; - - return ss.str(); -} - -std::string StatsServer::getValuesJSON() -{ - boost::mutex::scoped_lock lock(m_mutex); - - std::ostringstream ss; - ss << "{ \"values\" : {\n"; - - std::map<std::string,InputStat*>::iterator iter; - int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); - ++iter, i++) - { - const std::string& id = iter->first; - InputStat* stats = iter->second; - - if (i > 0) { - ss << " ,\n"; - } - - ss << " \"" << id << "\" : "; - ss << stats->encodeValuesJSON(); - stats->reset(); - } - - ss << "}\n}\n"; - - return ss.str(); -} - -std::string StatsServer::getStateJSON() -{ - boost::mutex::scoped_lock lock(m_mutex); - - std::ostringstream ss; - ss << "{\n"; - - std::map<std::string,InputStat*>::iterator iter; - int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); - ++iter, i++) - { - const std::string& id = iter->first; - InputStat* stats = iter->second; - - if (i > 0) { - ss << " ,\n"; - } - - ss << " \"" << id << "\" : "; - ss << stats->encodeStateJSON(); - stats->reset(); - } - - ss << "}\n"; - - return ss.str(); -} - -void StatsServer::restart() -{ - m_restarter_thread = boost::thread(&StatsServer::restart_thread, - this, 0); -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void StatsServer::restart_thread(long) -{ - m_running = false; - - if (m_listenport) { - m_thread.interrupt(); - m_thread.join(); - } - - m_thread = boost::thread(&StatsServer::serverThread, this); -} - -void StatsServer::serverThread() -{ - m_fault = false; - - try { - int accepted_sock; - char buffer[256]; - char welcome_msg[256]; - struct sockaddr_in serv_addr, cli_addr; - int n; - - int welcome_msg_len = snprintf(welcome_msg, 256, - "{ \"service\": \"" - "%s %s Stats Server\" }\n", - PACKAGE_NAME, -#if defined(GITVERSION) - GITVERSION -#else - PACKAGE_VERSION -#endif - ); - - - m_sock = socket(AF_INET, SOCK_STREAM, 0); - if (m_sock < 0) { - etiLog.level(error) << "Error opening Stats Server socket: " << - strerror(errno); - m_fault = true; - return; - } - - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 - serv_addr.sin_port = htons(m_listenport); - - if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - etiLog.level(error) << "Error binding Stats Server socket: " << - strerror(errno); - goto end_serverthread; - } - - if (listen(m_sock, 5) < 0) { - etiLog.level(error) << "Error listening on Stats Server socket: " << - strerror(errno); - goto end_serverthread; - } - - m_running = true; - - while (m_running) { - socklen_t cli_addr_len = sizeof(cli_addr); - - /* Accept actual connection from the client */ - accepted_sock = accept(m_sock, - (struct sockaddr *)&cli_addr, - &cli_addr_len); - - if (accepted_sock < 0) { - etiLog.level(warn) << "Stats Server cound not accept connection: " << - strerror(errno); - continue; - } - /* Send welcome message with version */ - n = write(accepted_sock, welcome_msg, welcome_msg_len); - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } - - /* receive command */ - memset(buffer, 0, 256); - int n = read(accepted_sock, buffer, 255); - if (n < 0) { - etiLog.level(warn) << "Error reading from Stats Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } - - if (strcmp(buffer, "config\n") == 0) { - std::string json = getConfigJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "values\n") == 0) { - std::string json = getValuesJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "state\n") == 0) { - std::string json = getStateJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else { - int len = snprintf(buffer, 256, "Invalid command\n"); - n = write(accepted_sock, buffer, len); - } - - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << - strerror(errno); - } - close(accepted_sock); - } - -end_serverthread: - m_fault = true; - close(m_sock); - - } - catch (std::exception& e) { - etiLog.level(error) << "Statistics server caught exception: " << e.what(); - m_fault = true; - } -} - -/************************************************/ - -void InputStat::registerAtServer() -{ - global_stats->registerInput(this); -} - -InputStat::~InputStat() -{ - global_stats->unregisterInput(m_name); -} - -std::string InputStat::encodeValuesJSON() -{ - std::ostringstream ss; - - const int16_t int16_max = std::numeric_limits<int16_t>::max(); - - boost::mutex::scoped_lock lock(m_mutex); - - /* convert to dB */ - int dB_l = peak_left ? round(20*log10((double)peak_left / int16_max)) : -90; - int dB_r = peak_right ? round(20*log10((double)peak_right / int16_max)) : -90; - - ss << - "{ \"inputstat\" : {" - "\"min_fill\": " << min_fill_buffer << ", " - "\"max_fill\": " << max_fill_buffer << ", " - "\"peak_left\": " << dB_l << ", " - "\"peak_right\": " << dB_r << ", " - "\"num_underruns\": " << num_underruns << ", " - "\"num_overruns\": " << num_overruns << - " } }"; - - return ss.str(); -} - -std::string InputStat::encodeStateJSON() -{ - std::ostringstream ss; - - ss << "{ \"state\" : "; - - switch (determineState()) { - case NoData: - ss << "\"NoData\""; - break; - case Unstable: - ss << "\"Unstable\""; - break; - case Silence: - ss << "\"Silent\""; - break; - case Streaming: - ss << "\"Streaming\""; - break; - default: - ss << "\"Unknown\""; - } - - ss << " }"; - - return ss.str(); -} - -input_state_t InputStat::determineState(void) -{ - boost::mutex::scoped_lock lock(m_mutex); - - time_t now = time(NULL); - input_state_t state; - - /* if the last event was more that INPUT_COUNTER_RESET_TIME - * minutes ago, the timeout has expired. We can reset our - * glitch counter. - */ - if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) { - m_glitch_counter = 0; - } - - // STATE CALCULATION - - /* If the buffer has been empty for more than - * INPUT_NODATA_TIMEOUT, we go to the NoData state. - */ - if (m_buffer_empty && - now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) { - state = NoData; - } - - /* Otherwise, the state depends on the glitch counter */ - else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) { - state = Unstable; - } - else { - /* The input is streaming, check if the audio level is too low */ - - if (m_silence_counter > INPUT_AUDIO_LEVEL_SILENCE_COUNT) { - state = Silence; - } - else { - state = Streaming; - } - } - - return state; -} - |