From 3f4214227e993305bb320b299245f466b9b22233 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 7 Mar 2015 17:19:31 +0100 Subject: Merge Stats and Config server, config is also JSON --- doc/example.mux | 4 +- src/ConfigParser.cpp | 7 +- src/ConfigParser.h | 2 +- src/ConfigServer.cpp | 177 -------------------- src/ConfigServer.h | 120 ------------- src/DabMux.cpp | 41 ++--- src/Makefile.am | 5 +- src/ManagementServer.cpp | 424 ++++++++++++++++++++++++++++++++++++++++++++++ src/ManagementServer.h | 427 +++++++++++++++++++++++++++++++++++++++++++++++ src/StatsServer.cpp | 409 --------------------------------------------- src/StatsServer.h | 392 ------------------------------------------- src/dabInputZmq.cpp | 3 - src/dabInputZmq.h | 2 +- 13 files changed, 878 insertions(+), 1135 deletions(-) delete mode 100644 src/ConfigServer.cpp delete mode 100644 src/ConfigServer.h create mode 100644 src/ManagementServer.cpp create mode 100644 src/ManagementServer.h delete mode 100644 src/StatsServer.cpp delete mode 100644 src/StatsServer.h diff --git a/doc/example.mux b/doc/example.mux index 09151e9..abf01bc 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -33,14 +33,14 @@ general { ; This also enables time encoding using the MNSC. tist false - ; The statsserver is a simple TCP server that can present + ; The management server is a simple TCP server that can present ; statistics data (buffers, overruns, underruns, etc) ; which can then be graphed a tool like Munin ; The doc/stats_dabmux_multi.py tool is a suitable ; plugin for that. ; If the port is zero, or the line commented, the server ; is not started. - statsserverport 12720 + managementport 12720 } remotecontrol { diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 2c4ab63..8c5cbaa 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -67,7 +67,7 @@ #include "dabInputDmbUdp.h" #include "dabInputZmq.h" #include "DabMux.h" -#include "StatsServer.h" +#include "ManagementServer.h" #ifdef _WIN32 @@ -132,7 +132,7 @@ void parse_ptree(boost::property_tree::ptree& pt, bool* factumAnalyzer, unsigned long* limit, BaseRemoteController** rc, - int* statsServerPort, + int* mgmtserverport, edi_configuration_t* edi ) { @@ -169,7 +169,8 @@ void parse_ptree(boost::property_tree::ptree& pt, *enableTist = pt_general.get("tist", false); - *statsServerPort = pt_general.get("statsserverport", 0); + *mgmtserverport = pt_general.get("managementport", + pt_general.get("statsserverport", 0) ); /************** READ REMOTE CONTROL PARAMETERS *************/ ptree pt_rc = pt.get_child("remotecontrol"); diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 16d2146..4af010a 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -45,7 +45,7 @@ void parse_ptree(boost::property_tree::ptree& pt, bool* factumAnalyzer, unsigned long* limit, BaseRemoteController** rc, - int* statsServerPort, + int* mgmtserverport, edi_configuration_t* edi); void setup_subchannel_from_ptree(dabSubchannel* subchan, diff --git a/src/ConfigServer.cpp b/src/ConfigServer.cpp deleted file mode 100644 index e2eefd4..0000000 --- a/src/ConfigServer.cpp +++ /dev/null @@ -1,177 +0,0 @@ -/* - Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, - 2011, 2012 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2014 - Matthias P. Braendli, matthias.braendli@mpb.li - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . -*/ - -#include "ConfigServer.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include "Log.h" - -void ConfigServer::serverThread() -{ - m_fault = false; - - try { - int accepted_sock; - char buffer[256]; - char welcome_msg[256]; - struct sockaddr_in serv_addr, cli_addr; - int n; - - int welcome_msg_len = snprintf(welcome_msg, 256, - "%s %s Config Server\n", - PACKAGE_NAME, -#if defined(GITVERSION) - GITVERSION -#else - PACKAGE_VERSION -#endif - ); - - - m_sock = socket(AF_INET, SOCK_STREAM, 0); - if (m_sock < 0) { - etiLog.level(error) << "Error opening Config Server socket: " << - strerror(errno); - m_fault = true; - return; - } - - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 - serv_addr.sin_port = htons(m_listenport); - - if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - etiLog.level(error) << "Error binding Config Server socket: " << - strerror(errno); - goto end_serverthread; - } - - if (listen(m_sock, 5) < 0) { - etiLog.level(error) << "Error listening on Config Server socket: " << - strerror(errno); - goto end_serverthread; - } - - m_running = true; - - while (m_running) { - socklen_t cli_addr_len = sizeof(cli_addr); - - /* Accept actual connection from the client */ - accepted_sock = accept(m_sock, - (struct sockaddr *)&cli_addr, - &cli_addr_len); - - if (accepted_sock < 0) { - etiLog.level(warn) << "Config Server cound not accept connection: " << - strerror(errno); - continue; - } - /* Send welcome message with version */ - n = write(accepted_sock, welcome_msg, welcome_msg_len); - if (n < 0) { - etiLog.level(warn) << "Error writing to Config Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } - - /* receive command */ - memset(buffer, 0, 256); - int n = read(accepted_sock, buffer, 255); - if (n < 0) { - etiLog.level(warn) << "Error reading from Config Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } - - if (strcmp(buffer, "getconfig\n") == 0) { - boost::unique_lock lock(m_mutex); - m_pending_request = "getconfig"; - m_pending = true; - - while (m_pending) { - m_condition.wait(lock); - } - std::stringstream ss; - boost::property_tree::info_parser::write_info(ss, m_pt); - - std::string response = ss.str(); - - n = write(accepted_sock, response.c_str(), response.size()); - } - else { - int len = snprintf(buffer, 256, "Invalid command\n"); - n = write(accepted_sock, buffer, len); - } - - if (n < 0) { - etiLog.level(warn) << "Error writing to Config Server socket " << - strerror(errno); - } - close(accepted_sock); - } - -end_serverthread: - m_fault = true; - close(m_sock); - - } - catch (std::exception& e) { - etiLog.level(error) << "Config server caught exception: " << e.what(); - m_fault = true; - } -} - -void ConfigServer::restart() -{ - m_restarter_thread = boost::thread(&ConfigServer::restart_thread, - this, 0); -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void ConfigServer::restart_thread(long) -{ - m_running = false; - - if (m_listenport) { - m_thread.interrupt(); - m_thread.join(); - } - - m_thread = boost::thread(&ConfigServer::serverThread, this); -} - diff --git a/src/ConfigServer.h b/src/ConfigServer.h deleted file mode 100644 index b51ef32..0000000 --- a/src/ConfigServer.h +++ /dev/null @@ -1,120 +0,0 @@ -/* - Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, - 2011, 2012 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2014 - Matthias P. Braendli, matthias.braendli@mpb.li - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . -*/ - -#ifndef __CONFIG_SERVER_H_ -#define __CONFIG_SERVER_H_ - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -class ConfigServer -{ - public: - ConfigServer() : - m_listenport(0), - m_running(false), - m_fault(false) - { } - - ConfigServer(int listen_port) : - m_listenport(listen_port), - m_running(false), - m_fault(false), - m_thread(&ConfigServer::serverThread, this) - { - m_sock = 0; - } - - ~ConfigServer() - { - m_running = false; - m_fault = false; - if (m_sock) { - close(m_sock); - } - m_thread.join(); - } - - bool request_pending() { - return m_pending; - } - - void update_ptree(const boost::property_tree::ptree& pt) { - boost::unique_lock lock(m_mutex); - m_pt = pt; - m_pending = false; - - m_condition.notify_one(); - } - - bool fault_detected() { return m_fault; } - void restart(void); - - private: - void restart_thread(long); - - /******* TCP Socket Server ******/ - // no copying (because of the thread) - ConfigServer(const ConfigServer& other); - - void serverThread(void); - - int m_listenport; - - // serverThread runs in a separate thread - bool m_running; - bool m_fault; - boost::thread m_thread; - boost::thread m_restarter_thread; - - int m_sock; - - bool m_pending; - std::string m_pending_request; - boost::condition_variable m_condition; - boost::mutex m_mutex; - - boost::property_tree::ptree m_pt; -}; - - -#endif - diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 950eefe..cbbd612 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -3,8 +3,10 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -127,8 +129,7 @@ typedef DWORD32 uint32_t; #include "utils.h" #include "ParserCmdline.h" #include "ConfigParser.h" -#include "StatsServer.h" -#include "ConfigServer.h" +#include "ManagementServer.h" #include "Log.h" #include "RemoteControl.h" @@ -137,10 +138,8 @@ using boost::property_tree::ptree; using boost::property_tree::ptree_error; -/* Global stats server */ -StatsServer* global_stats; - -ConfigServer config_server(8001); +/* Global stats and config server */ +ManagementServer* mgmt_server; class MuxInitException : public exception { @@ -333,7 +332,7 @@ int main(int argc, char *argv[]) bool enableTist = false; unsigned timestamp = 0; - int statsserverport = 0; + int mgmtserverport = 0; edi_configuration_t edi_conf; @@ -373,7 +372,7 @@ int main(int argc, char *argv[]) read_info(conf_file, pt); parse_ptree(pt, outputs, ensemble, &enableTist, &FICL, - &factumAnalyzer, &limit, &rc, &statsserverport, &edi_conf); + &factumAnalyzer, &limit, &rc, &mgmtserverport, &edi_conf); } catch (runtime_error &e) { etiLog.log(error, "Configuration file parsing error: %s\n", @@ -393,7 +392,7 @@ int main(int argc, char *argv[]) string conf_file = argv[2]; parse_configfile(conf_file, outputs, ensemble, &enableTist, &FICL, - &factumAnalyzer, &limit, &rc, &statsserverport, &edi_conf); + &factumAnalyzer, &limit, &rc, &mgmtserverport, &edi_conf); } catch (runtime_error &e) { @@ -415,11 +414,11 @@ int main(int argc, char *argv[]) } #endif - if (statsserverport != 0) { - global_stats = new StatsServer(statsserverport); + if (mgmtserverport != 0) { + mgmt_server = new ManagementServer(mgmtserverport); } else { - global_stats = new StatsServer(); + mgmt_server = new ManagementServer(); } if (rc) { @@ -2174,21 +2173,15 @@ int main(int argc, char *argv[]) } /* Same for statistics server */ - if (global_stats && fc->FCT == 249 && global_stats->fault_detected()) { + if (mgmt_server && fc->FCT % 10 == 0) { + if (mgmt_server->fault_detected()) { etiLog.level(warn) << "Detected Statistics Server fault, restarting it"; - global_stats->restart(); - } - - if (fc->FCT % 10 == 0) { - if (config_server.fault_detected()) { - config_server.restart(); + mgmt_server->restart(); } - - if (config_server.request_pending()) { - config_server.update_ptree(pt); + else if (mgmt_server->request_pending()) { + mgmt_server->update_ptree(pt); } - } } diff --git a/src/Makefile.am b/src/Makefile.am index aa1f86a..f3bce5e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -79,7 +79,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ MuxElements.cpp MuxElements.h \ RemoteControl.cpp RemoteControl.h \ ParserCmdline.cpp ParserCmdline.h \ - ParserConfig.cpp ParserConfig.h \ + ConfigParser.cpp ConfigParser.h \ Eti.h Eti.cpp \ Log.h Log.cpp \ UdpSocket.h UdpSocket.cpp \ @@ -92,8 +92,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ Interleaver.h Interleaver.cpp \ ReedSolomon.h ReedSolomon.cpp \ mpeg.h mpeg.c \ - StatsServer.h StatsServer.cpp \ - ConfigServer.h ConfigServer.cpp \ + ManagementServer.h ManagementServer.cpp \ TcpServer.h TcpServer.cpp \ TcpSocket.h TcpSocket.cpp \ zmq.hpp diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp new file mode 100644 index 0000000..9ebdfeb --- /dev/null +++ b/src/ManagementServer.cpp @@ -0,0 +1,424 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2014, 2015 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + A TCP Socket server that serves state information and statistics for + monitoring purposes, and also serves the internal configuration + property tree. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "ManagementServer.h" +#include "Log.h" + +void ManagementServer::registerInput(InputStat* is) +{ + boost::mutex::scoped_lock lock(m_statsmutex); + + std::string id(is->get_name()); + + if (m_inputStats.count(id) == 1) { + etiLog.level(error) << + "Double registration in Stats Server with id '" << + id << "'"; + return; + } + + m_inputStats[id] = is; +} + +void ManagementServer::unregisterInput(std::string id) +{ + boost::mutex::scoped_lock lock(m_statsmutex); + + if (m_inputStats.count(id) == 1) { + m_inputStats.erase(id); + } +} + + +bool ManagementServer::isInputRegistered(std::string& id) +{ + boost::mutex::scoped_lock lock(m_statsmutex); + + if (m_inputStats.count(id) == 0) { + etiLog.level(error) << + "Stats Server id '" << + id << "' does was not registered"; + return false; + } + return true; +} + +std::string ManagementServer::getStatConfigJSON() +{ + boost::mutex::scoped_lock lock(m_statsmutex); + + std::ostringstream ss; + ss << "{ \"config\" : [\n"; + + std::map::iterator iter; + int i = 0; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + ++iter, i++) + { + std::string id = iter->first; + + if (i > 0) { + ss << ", "; + } + + ss << " \"" << id << "\" "; + } + + ss << "] }\n"; + + return ss.str(); +} + +std::string ManagementServer::getValuesJSON() +{ + boost::mutex::scoped_lock lock(m_statsmutex); + + std::ostringstream ss; + ss << "{ \"values\" : {\n"; + + std::map::iterator iter; + int i = 0; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + ++iter, i++) + { + const std::string& id = iter->first; + InputStat* stats = iter->second; + + if (i > 0) { + ss << " ,\n"; + } + + ss << " \"" << id << "\" : "; + ss << stats->encodeValuesJSON(); + stats->reset(); + } + + ss << "}\n}\n"; + + return ss.str(); +} + +std::string ManagementServer::getStateJSON() +{ + boost::mutex::scoped_lock lock(m_statsmutex); + + std::ostringstream ss; + ss << "{\n"; + + std::map::iterator iter; + int i = 0; + for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + ++iter, i++) + { + const std::string& id = iter->first; + InputStat* stats = iter->second; + + if (i > 0) { + ss << " ,\n"; + } + + ss << " \"" << id << "\" : "; + ss << stats->encodeStateJSON(); + stats->reset(); + } + + ss << "}\n"; + + return ss.str(); +} + +void ManagementServer::restart() +{ + m_restarter_thread = boost::thread(&ManagementServer::restart_thread, + this, 0); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void ManagementServer::restart_thread(long) +{ + m_running = false; + + if (m_listenport) { + m_thread.interrupt(); + m_thread.join(); + } + + m_thread = boost::thread(&ManagementServer::serverThread, this); +} + +void ManagementServer::serverThread() +{ + m_fault = false; + + try { + int accepted_sock; + char buffer[256]; + char welcome_msg[256]; + struct sockaddr_in serv_addr, cli_addr; + int n; + + int welcome_msg_len = snprintf(welcome_msg, 256, + "{ \"service\": \"" + "%s %s Stats Server\" }\n", + PACKAGE_NAME, +#if defined(GITVERSION) + GITVERSION +#else + PACKAGE_VERSION +#endif + ); + + + m_sock = socket(AF_INET, SOCK_STREAM, 0); + if (m_sock < 0) { + etiLog.level(error) << "Error opening Stats Server socket: " << + strerror(errno); + m_fault = true; + return; + } + + memset(&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 + serv_addr.sin_port = htons(m_listenport); + + if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + etiLog.level(error) << "Error binding Stats Server socket: " << + strerror(errno); + goto end_serverthread; + } + + if (listen(m_sock, 5) < 0) { + etiLog.level(error) << "Error listening on Stats Server socket: " << + strerror(errno); + goto end_serverthread; + } + + m_running = true; + + while (m_running) { + socklen_t cli_addr_len = sizeof(cli_addr); + + /* Accept actual connection from the client */ + accepted_sock = accept(m_sock, + (struct sockaddr *)&cli_addr, + &cli_addr_len); + + if (accepted_sock < 0) { + etiLog.level(warn) << "Stats Server cound not accept connection: " << + strerror(errno); + continue; + } + /* Send welcome message with version */ + n = write(accepted_sock, welcome_msg, welcome_msg_len); + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + /* receive command */ + memset(buffer, 0, 256); + int n = read(accepted_sock, buffer, 255); + if (n < 0) { + etiLog.level(warn) << "Error reading from Stats Server socket " << + strerror(errno); + close(accepted_sock); + continue; + } + + if (strcmp(buffer, "config\n") == 0) { + std::string json = getStatConfigJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "values\n") == 0) { + std::string json = getValuesJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + else if (strcmp(buffer, "state\n") == 0) { + std::string json = getStateJSON(); + n = write(accepted_sock, json.c_str(), json.size()); + } + if (strcmp(buffer, "getptree\n") == 0) { + boost::unique_lock lock(m_configmutex); + m_pending = true; + + while (m_pending) { + m_condition.wait(lock); + } + std::stringstream ss; + boost::property_tree::json_parser::write_json(ss, m_pt); + + std::string response = ss.str(); + + n = write(accepted_sock, response.c_str(), response.size()); + } + else { + int len = snprintf(buffer, 256, "Invalid command\n"); + n = write(accepted_sock, buffer, len); + } + + if (n < 0) { + etiLog.level(warn) << "Error writing to Stats Server socket " << + strerror(errno); + } + close(accepted_sock); + } + +end_serverthread: + m_fault = true; + close(m_sock); + + } + catch (std::exception& e) { + etiLog.level(error) << "Statistics server caught exception: " << e.what(); + m_fault = true; + } +} + +/************************************************/ + +void InputStat::registerAtServer() +{ + mgmt_server->registerInput(this); +} + +InputStat::~InputStat() +{ + mgmt_server->unregisterInput(m_name); +} + +std::string InputStat::encodeValuesJSON() +{ + std::ostringstream ss; + + const int16_t int16_max = std::numeric_limits::max(); + + boost::mutex::scoped_lock lock(m_mutex); + + /* convert to dB */ + int dB_l = peak_left ? round(20*log10((double)peak_left / int16_max)) : -90; + int dB_r = peak_right ? round(20*log10((double)peak_right / int16_max)) : -90; + + ss << + "{ \"inputstat\" : {" + "\"min_fill\": " << min_fill_buffer << ", " + "\"max_fill\": " << max_fill_buffer << ", " + "\"peak_left\": " << dB_l << ", " + "\"peak_right\": " << dB_r << ", " + "\"num_underruns\": " << num_underruns << ", " + "\"num_overruns\": " << num_overruns << + " } }"; + + return ss.str(); +} + +std::string InputStat::encodeStateJSON() +{ + std::ostringstream ss; + + ss << "{ \"state\" : "; + + switch (determineState()) { + case NoData: + ss << "\"NoData\""; + break; + case Unstable: + ss << "\"Unstable\""; + break; + case Silence: + ss << "\"Silent\""; + break; + case Streaming: + ss << "\"Streaming\""; + break; + default: + ss << "\"Unknown\""; + } + + ss << " }"; + + return ss.str(); +} + +input_state_t InputStat::determineState(void) +{ + boost::mutex::scoped_lock lock(m_mutex); + + time_t now = time(NULL); + input_state_t state; + + /* if the last event was more that INPUT_COUNTER_RESET_TIME + * minutes ago, the timeout has expired. We can reset our + * glitch counter. + */ + if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) { + m_glitch_counter = 0; + } + + // STATE CALCULATION + + /* If the buffer has been empty for more than + * INPUT_NODATA_TIMEOUT, we go to the NoData state. + */ + if (m_buffer_empty && + now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) { + state = NoData; + } + + /* Otherwise, the state depends on the glitch counter */ + else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) { + state = Unstable; + } + else { + /* The input is streaming, check if the audio level is too low */ + + if (m_silence_counter > INPUT_AUDIO_LEVEL_SILENCE_COUNT) { + state = Silence; + } + else { + state = Streaming; + } + } + + return state; +} + diff --git a/src/ManagementServer.h b/src/ManagementServer.h new file mode 100644 index 0000000..fa3f170 --- /dev/null +++ b/src/ManagementServer.h @@ -0,0 +1,427 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2014, 2015 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + A TCP Socket server that serves state information and statistics for + monitoring purposes, and also serves the internal configuration + property tree. + + This statistics server is very easy to integrate with munin + http://munin-monitoring.org/ + but is not specific to it. + + The TCP Server responds in JSON, and accepts the commands: + - config + - values + Inspired by the munin equivalent + + - state + Returns the state of each input + + - getptree + Returns the internal boost property_tree that contains the + multiplexer configuration DB. + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#ifndef __MANAGEMENT_SERVER_H +#define __MANAGEMENT_SERVER_H + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MIN_FILL_BUFFER_UNDEF (-1) + +/*** State handing ***/ +/* An input can be in one of the following three states: + */ +enum input_state_t +{ + /* The input is waiting for data, all buffers are empty */ + NoData, + + /* The input is running, but has seen many underruns or overruns recently */ + Unstable, + + /* The input is running, but the audio level is too low, or has + * been too low recently + */ + Silence, + + /* The input is running stable */ + Streaming +}; + + +/* The delay after which the glitch counter is reset + */ +#define INPUT_COUNTER_RESET_TIME 30 // minutes + +/* How many glitches we tolerate in Streaming state before + * we consider the input Unstable + */ +#define INPUT_UNSTABLE_THRESHOLD 3 + +/* For how long the input buffers must be empty before we move an input to the + * NoData state. + */ +#define INPUT_NODATA_TIMEOUT 30 // seconds + +/* For silence detection, we count the number of occurrences the audio level + * falls below a threshold. + * + * The counter is decreased for each frame that has good audio level. + * + * The counter saturates, and this value defines for how long the + * input will be considered silent after a cut. + * + * If the count reaches a certain value, the input changes state + * to Silence. + */ +#define INPUT_AUDIO_LEVEL_THRESHOLD -50 // dB +#define INPUT_AUDIO_LEVEL_SILENCE_COUNT 100 // superframes (120ms) +#define INPUT_AUDIO_LEVEL_COUNT_SATURATION 500 // superframes (120ms) + +/* An example of how the state changes work. + * The timeout is set to expire in 30 minutes + * at each under-/overrun. + * + * The glitch counter is increased by one for each glitch (can be a + * saturating counter), and set to zero when the counter timeout expires. + * + * The state is then simply depending on the glitch counter value. + * + * Graphical example: + + state STREAMING | UNSTABLE | STREAMING + xruns U U U + glitch + counter 0 1 2 3 0 + reset + timeout \ |\ |\ |\ + \ | \ | \ | \ + \ | \ | \ | \ + \| \ | \| \ + ` \| ` \ + ` \ + \ + \ + \ + \ + timeout expires ___________________\ + <--30min--> + */ + +/* InputStat takes care of + * - saving the statistics for graphing + * - calculating the state of the input for monitoring + */ +class InputStat +{ + public: + InputStat(std::string name) : m_name(name) + { + /* Statistics */ + num_underruns = 0; + num_overruns = 0; + + /* State handling */ + time_t now = time(NULL); + m_time_last_event = now; + m_time_last_buffer_nonempty = 0; + m_buffer_empty = true; + m_glitch_counter = 0; + m_silence_counter = 0; + + reset(); + } + + void registerAtServer(void); + + ~InputStat(); + + // Gets called each time the statistics are transmitted, + // and resets the counters to zero + void reset(void) + { + min_fill_buffer = MIN_FILL_BUFFER_UNDEF; + max_fill_buffer = 0; + + peak_left = 0; + peak_right = 0; + } + + std::string& get_name(void) { return m_name; } + + /* This function is called for every frame read by + * the multiplexer + */ + void notifyBuffer(long bufsize) + { + boost::mutex::scoped_lock lock(m_mutex); + + // Statistics + if (bufsize > max_fill_buffer) { + max_fill_buffer = bufsize; + } + + if (bufsize < min_fill_buffer || + min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { + min_fill_buffer = bufsize; + } + + // State + m_buffer_empty = (bufsize == 0); + if (!m_buffer_empty) { + m_time_last_buffer_nonempty = time(NULL); + } + } + + void notifyPeakLevels(int peak_left, int peak_right) + { + boost::mutex::scoped_lock lock(m_mutex); + + // Statistics + if (peak_left > this->peak_left) { + this->peak_left = peak_left; + } + + if (peak_right > this->peak_right) { + this->peak_right = peak_right; + } + + // State + + // using the smallest of the two channels + // allows us to detect if only one channel + // is silent. + int minpeak = peak_left < peak_right ? peak_left : peak_right; + + const int16_t int16_max = std::numeric_limits::max(); + int peak_dB = minpeak ? + round(20*log10((double)minpeak / int16_max)) : + -90; + + if (peak_dB < INPUT_AUDIO_LEVEL_THRESHOLD) { + if (m_silence_counter < INPUT_AUDIO_LEVEL_COUNT_SATURATION) { + m_silence_counter++; + } + } + else { + if (m_silence_counter > 0) { + m_silence_counter--; + } + } + } + + void notifyUnderrun(void) + { + boost::mutex::scoped_lock lock(m_mutex); + + // Statistics + num_underruns++; + + // State + m_time_last_event = time(NULL); + if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { + m_glitch_counter++; + } + } + + void notifyOverrun(void) + { + boost::mutex::scoped_lock lock(m_mutex); + + // Statistics + num_overruns++; + + // State + m_time_last_event = time(NULL); + if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { + m_glitch_counter++; + } + } + + std::string encodeValuesJSON(void); + + std::string encodeStateJSON(void); + + input_state_t determineState(void); + + private: + std::string m_name; + + /************ STATISTICS ***********/ + // minimum and maximum buffer fill since last reset + long min_fill_buffer; + long max_fill_buffer; + + // counter of number of overruns and underruns since startup + uint32_t num_underruns; + uint32_t num_overruns; + + // peak audio levels (linear 16-bit PCM) for the two channels + int peak_left; + int peak_right; + + /************* STATE ***************/ + /* Variables used for determining the input state */ + int m_glitch_counter; // saturating counter + int m_silence_counter; // saturating counter + time_t m_time_last_event; + time_t m_time_last_buffer_nonempty; + bool m_buffer_empty; + + // The mutex that has to be held during all notify and readout + mutable boost::mutex m_mutex; + +}; + +class ManagementServer +{ + public: + ManagementServer() : + m_listenport(0), + m_running(false), + m_fault(false), + m_pending(false) + { } + + ManagementServer(int listen_port) : + m_listenport(listen_port), + m_running(false), + m_fault(false), + m_thread(&ManagementServer::serverThread, this), + m_pending(false) + { + m_sock = 0; + } + + ~ManagementServer() + { + m_running = false; + m_fault = false; + m_pending = false; + if (m_sock) { + close(m_sock); + } + m_thread.join(); + } + + /* Un-/Register a statistics data source */ + void registerInput(InputStat* is); + void unregisterInput(std::string id); + + /* Ask if there is a configuration request pending */ + bool request_pending() { return m_pending; } + + /* Update the copy of the configuration property tree and notify the + * update to the internal server thread. + */ + void update_ptree(const boost::property_tree::ptree& pt) { + if (m_running) { + boost::unique_lock lock(m_configmutex); + m_pt = pt; + m_pending = false; + + m_condition.notify_one(); + } + } + + bool fault_detected() { return m_fault; } + void restart(void); + + private: + void restart_thread(long); + + /******* TCP Socket Server ******/ + // no copying (because of the thread) + ManagementServer(const ManagementServer& other); + + void serverThread(void); + + bool isInputRegistered(std::string& id); + + int m_listenport; + + // serverThread runs in a separate thread + bool m_running; + bool m_fault; + boost::thread m_thread; + boost::thread m_restarter_thread; + + int m_sock; + + /******* Statistics Data ********/ + std::map m_inputStats; + + /* Return a description of the configuration that will + * allow to define what graphs to be created + * + * returns: a JSON encoded configuration + */ + std::string getStatConfigJSON(); + + /* Return the values for the statistics as defined in the configuration + * + * returns: JSON encoded statistics + */ + std::string getValuesJSON(); + + /* Return the state of each input + * + * returns: JSON encoded state + */ + std::string getStateJSON(); + + // mutex for accessing the map + mutable boost::mutex m_statsmutex; + + /******** Configuration Data *******/ + bool m_pending; + boost::condition_variable m_condition; + mutable boost::mutex m_configmutex; + + boost::property_tree::ptree m_pt; +}; + +extern ManagementServer* mgmt_server; + +#endif + diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp deleted file mode 100644 index 558a479..0000000 --- a/src/StatsServer.cpp +++ /dev/null @@ -1,409 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2014 Matthias P. Braendli - http://mpb.li - - A TCP Socket server that serves state information and statistics for - monitoring purposes. - - This server is very easy to integrate with munin http://munin-monitoring.org/ - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include "StatsServer.h" -#include "Log.h" - -void StatsServer::registerInput(InputStat* is) -{ - boost::mutex::scoped_lock lock(m_mutex); - - std::string id(is->get_name()); - - if (m_inputStats.count(id) == 1) { - etiLog.level(error) << - "Double registration in Stats Server with id '" << - id << "'"; - return; - } - - m_inputStats[id] = is; -} - -void StatsServer::unregisterInput(std::string id) -{ - boost::mutex::scoped_lock lock(m_mutex); - - if (m_inputStats.count(id) == 1) { - m_inputStats.erase(id); - } -} - - -bool StatsServer::isInputRegistered(std::string& id) -{ - boost::mutex::scoped_lock lock(m_mutex); - - if (m_inputStats.count(id) == 0) { - etiLog.level(error) << - "Stats Server id '" << - id << "' does was not registered"; - return false; - } - return true; -} - -std::string StatsServer::getConfigJSON() -{ - boost::mutex::scoped_lock lock(m_mutex); - - std::ostringstream ss; - ss << "{ \"config\" : [\n"; - - std::map::iterator iter; - int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); - ++iter, i++) - { - std::string id = iter->first; - - if (i > 0) { - ss << ", "; - } - - ss << " \"" << id << "\" "; - } - - ss << "] }\n"; - - return ss.str(); -} - -std::string StatsServer::getValuesJSON() -{ - boost::mutex::scoped_lock lock(m_mutex); - - std::ostringstream ss; - ss << "{ \"values\" : {\n"; - - std::map::iterator iter; - int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); - ++iter, i++) - { - const std::string& id = iter->first; - InputStat* stats = iter->second; - - if (i > 0) { - ss << " ,\n"; - } - - ss << " \"" << id << "\" : "; - ss << stats->encodeValuesJSON(); - stats->reset(); - } - - ss << "}\n}\n"; - - return ss.str(); -} - -std::string StatsServer::getStateJSON() -{ - boost::mutex::scoped_lock lock(m_mutex); - - std::ostringstream ss; - ss << "{\n"; - - std::map::iterator iter; - int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); - ++iter, i++) - { - const std::string& id = iter->first; - InputStat* stats = iter->second; - - if (i > 0) { - ss << " ,\n"; - } - - ss << " \"" << id << "\" : "; - ss << stats->encodeStateJSON(); - stats->reset(); - } - - ss << "}\n"; - - return ss.str(); -} - -void StatsServer::restart() -{ - m_restarter_thread = boost::thread(&StatsServer::restart_thread, - this, 0); -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void StatsServer::restart_thread(long) -{ - m_running = false; - - if (m_listenport) { - m_thread.interrupt(); - m_thread.join(); - } - - m_thread = boost::thread(&StatsServer::serverThread, this); -} - -void StatsServer::serverThread() -{ - m_fault = false; - - try { - int accepted_sock; - char buffer[256]; - char welcome_msg[256]; - struct sockaddr_in serv_addr, cli_addr; - int n; - - int welcome_msg_len = snprintf(welcome_msg, 256, - "{ \"service\": \"" - "%s %s Stats Server\" }\n", - PACKAGE_NAME, -#if defined(GITVERSION) - GITVERSION -#else - PACKAGE_VERSION -#endif - ); - - - m_sock = socket(AF_INET, SOCK_STREAM, 0); - if (m_sock < 0) { - etiLog.level(error) << "Error opening Stats Server socket: " << - strerror(errno); - m_fault = true; - return; - } - - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 - serv_addr.sin_port = htons(m_listenport); - - if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - etiLog.level(error) << "Error binding Stats Server socket: " << - strerror(errno); - goto end_serverthread; - } - - if (listen(m_sock, 5) < 0) { - etiLog.level(error) << "Error listening on Stats Server socket: " << - strerror(errno); - goto end_serverthread; - } - - m_running = true; - - while (m_running) { - socklen_t cli_addr_len = sizeof(cli_addr); - - /* Accept actual connection from the client */ - accepted_sock = accept(m_sock, - (struct sockaddr *)&cli_addr, - &cli_addr_len); - - if (accepted_sock < 0) { - etiLog.level(warn) << "Stats Server cound not accept connection: " << - strerror(errno); - continue; - } - /* Send welcome message with version */ - n = write(accepted_sock, welcome_msg, welcome_msg_len); - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } - - /* receive command */ - memset(buffer, 0, 256); - int n = read(accepted_sock, buffer, 255); - if (n < 0) { - etiLog.level(warn) << "Error reading from Stats Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } - - if (strcmp(buffer, "config\n") == 0) { - std::string json = getConfigJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "values\n") == 0) { - std::string json = getValuesJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "state\n") == 0) { - std::string json = getStateJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else { - int len = snprintf(buffer, 256, "Invalid command\n"); - n = write(accepted_sock, buffer, len); - } - - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << - strerror(errno); - } - close(accepted_sock); - } - -end_serverthread: - m_fault = true; - close(m_sock); - - } - catch (std::exception& e) { - etiLog.level(error) << "Statistics server caught exception: " << e.what(); - m_fault = true; - } -} - -/************************************************/ - -void InputStat::registerAtServer() -{ - global_stats->registerInput(this); -} - -InputStat::~InputStat() -{ - global_stats->unregisterInput(m_name); -} - -std::string InputStat::encodeValuesJSON() -{ - std::ostringstream ss; - - const int16_t int16_max = std::numeric_limits::max(); - - boost::mutex::scoped_lock lock(m_mutex); - - /* convert to dB */ - int dB_l = peak_left ? round(20*log10((double)peak_left / int16_max)) : -90; - int dB_r = peak_right ? round(20*log10((double)peak_right / int16_max)) : -90; - - ss << - "{ \"inputstat\" : {" - "\"min_fill\": " << min_fill_buffer << ", " - "\"max_fill\": " << max_fill_buffer << ", " - "\"peak_left\": " << dB_l << ", " - "\"peak_right\": " << dB_r << ", " - "\"num_underruns\": " << num_underruns << ", " - "\"num_overruns\": " << num_overruns << - " } }"; - - return ss.str(); -} - -std::string InputStat::encodeStateJSON() -{ - std::ostringstream ss; - - ss << "{ \"state\" : "; - - switch (determineState()) { - case NoData: - ss << "\"NoData\""; - break; - case Unstable: - ss << "\"Unstable\""; - break; - case Silence: - ss << "\"Silent\""; - break; - case Streaming: - ss << "\"Streaming\""; - break; - default: - ss << "\"Unknown\""; - } - - ss << " }"; - - return ss.str(); -} - -input_state_t InputStat::determineState(void) -{ - boost::mutex::scoped_lock lock(m_mutex); - - time_t now = time(NULL); - input_state_t state; - - /* if the last event was more that INPUT_COUNTER_RESET_TIME - * minutes ago, the timeout has expired. We can reset our - * glitch counter. - */ - if (now - m_time_last_event > 60*INPUT_COUNTER_RESET_TIME) { - m_glitch_counter = 0; - } - - // STATE CALCULATION - - /* If the buffer has been empty for more than - * INPUT_NODATA_TIMEOUT, we go to the NoData state. - */ - if (m_buffer_empty && - now - m_time_last_buffer_nonempty > INPUT_NODATA_TIMEOUT) { - state = NoData; - } - - /* Otherwise, the state depends on the glitch counter */ - else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) { - state = Unstable; - } - else { - /* The input is streaming, check if the audio level is too low */ - - if (m_silence_counter > INPUT_AUDIO_LEVEL_SILENCE_COUNT) { - state = Silence; - } - else { - state = Streaming; - } - } - - return state; -} - diff --git a/src/StatsServer.h b/src/StatsServer.h deleted file mode 100644 index 26f90d4..0000000 --- a/src/StatsServer.h +++ /dev/null @@ -1,392 +0,0 @@ -/* - Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2014 Matthias P. Braendli - http://mpb.li - - A TCP Socket server that serves state information and statistics for - monitoring purposes. - - This server is very easy to integrate with munin - http://munin-monitoring.org/ - but is not specific to it. - - The TCP Server responds in JSON, and accepts two commands: - - config - - values - Inspired by the munin equivalent - - - state - Returns the state of each input - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#ifndef __STATS_SERVER_H -#define __STATS_SERVER_H - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define MIN_FILL_BUFFER_UNDEF (-1) - -/*** State handing ***/ -/* An input can be in one of the following three states: - */ -enum input_state_t -{ - /* The input is waiting for data, all buffers are empty */ - NoData, - - /* The input is running, but has seen many underruns or overruns recently */ - Unstable, - - /* The input is running, but the audio level is too low, or has - * been too low recently - */ - Silence, - - /* The input is running stable */ - Streaming -}; - - -/* The delay after which the glitch counter is reset - */ -#define INPUT_COUNTER_RESET_TIME 30 // minutes - -/* How many glitches we tolerate in Streaming state before - * we consider the input Unstable - */ -#define INPUT_UNSTABLE_THRESHOLD 3 - -/* For how long the input buffers must be empty before we move an input to the - * NoData state. - */ -#define INPUT_NODATA_TIMEOUT 30 // seconds - -/* For silence detection, we count the number of occurrences the audio level - * falls below a threshold. - * - * The counter is decreased for each frame that has good audio level. - * - * The counter saturates, and this value defines for how long the - * input will be considered silent after a cut. - * - * If the count reaches a certain value, the input changes state - * to Silence. - */ -#define INPUT_AUDIO_LEVEL_THRESHOLD -50 // dB -#define INPUT_AUDIO_LEVEL_SILENCE_COUNT 100 // superframes (120ms) -#define INPUT_AUDIO_LEVEL_COUNT_SATURATION 500 // superframes (120ms) - -/* An example of how the state changes work. - * The timeout is set to expire in 30 minutes - * at each under-/overrun. - * - * The glitch counter is increased by one for each glitch (can be a - * saturating counter), and set to zero when the counter timeout expires. - * - * The state is then simply depending on the glitch counter value. - * - * Graphical example: - - state STREAMING | UNSTABLE | STREAMING - xruns U U U - glitch - counter 0 1 2 3 0 - reset - timeout \ |\ |\ |\ - \ | \ | \ | \ - \ | \ | \ | \ - \| \ | \| \ - ` \| ` \ - ` \ - \ - \ - \ - \ - timeout expires ___________________\ - <--30min--> - */ - -/* InputStat takes care of - * - saving the statistics for graphing - * - calculating the state of the input for monitoring - */ -class InputStat -{ - public: - InputStat(std::string name) : m_name(name) - { - /* Statistics */ - num_underruns = 0; - num_overruns = 0; - - /* State handling */ - time_t now = time(NULL); - m_time_last_event = now; - m_time_last_buffer_nonempty = 0; - m_buffer_empty = true; - m_glitch_counter = 0; - m_silence_counter = 0; - - reset(); - } - - void registerAtServer(void); - - ~InputStat(); - - // Gets called each time the statistics are transmitted, - // and resets the counters to zero - void reset(void) - { - min_fill_buffer = MIN_FILL_BUFFER_UNDEF; - max_fill_buffer = 0; - - peak_left = 0; - peak_right = 0; - } - - std::string& get_name(void) { return m_name; } - - /* This function is called for every frame read by - * the multiplexer - */ - void notifyBuffer(long bufsize) - { - boost::mutex::scoped_lock lock(m_mutex); - - // Statistics - if (bufsize > max_fill_buffer) { - max_fill_buffer = bufsize; - } - - if (bufsize < min_fill_buffer || - min_fill_buffer == MIN_FILL_BUFFER_UNDEF) { - min_fill_buffer = bufsize; - } - - // State - m_buffer_empty = (bufsize == 0); - if (!m_buffer_empty) { - m_time_last_buffer_nonempty = time(NULL); - } - } - - void notifyPeakLevels(int peak_left, int peak_right) - { - boost::mutex::scoped_lock lock(m_mutex); - - // Statistics - if (peak_left > this->peak_left) { - this->peak_left = peak_left; - } - - if (peak_right > this->peak_right) { - this->peak_right = peak_right; - } - - // State - - // using the smallest of the two channels - // allows us to detect if only one channel - // is silent. - int minpeak = peak_left < peak_right ? peak_left : peak_right; - - const int16_t int16_max = std::numeric_limits::max(); - int peak_dB = minpeak ? - round(20*log10((double)minpeak / int16_max)) : - -90; - - if (peak_dB < INPUT_AUDIO_LEVEL_THRESHOLD) { - if (m_silence_counter < INPUT_AUDIO_LEVEL_COUNT_SATURATION) { - m_silence_counter++; - } - } - else { - if (m_silence_counter > 0) { - m_silence_counter--; - } - } - } - - void notifyUnderrun(void) - { - boost::mutex::scoped_lock lock(m_mutex); - - // Statistics - num_underruns++; - - // State - m_time_last_event = time(NULL); - if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { - m_glitch_counter++; - } - } - - void notifyOverrun(void) - { - boost::mutex::scoped_lock lock(m_mutex); - - // Statistics - num_overruns++; - - // State - m_time_last_event = time(NULL); - if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { - m_glitch_counter++; - } - } - - std::string encodeValuesJSON(void); - - std::string encodeStateJSON(void); - - input_state_t determineState(void); - - private: - std::string m_name; - - /************ STATISTICS ***********/ - // minimum and maximum buffer fill since last reset - long min_fill_buffer; - long max_fill_buffer; - - // counter of number of overruns and underruns since startup - uint32_t num_underruns; - uint32_t num_overruns; - - // peak audio levels (linear 16-bit PCM) for the two channels - int peak_left; - int peak_right; - - /************* STATE ***************/ - /* Variables used for determining the input state */ - int m_glitch_counter; // saturating counter - int m_silence_counter; // saturating counter - time_t m_time_last_event; - time_t m_time_last_buffer_nonempty; - bool m_buffer_empty; - - // The mutex that has to be held during all notify and readout - mutable boost::mutex m_mutex; - -}; - -class StatsServer -{ - public: - StatsServer() : - m_listenport(0), - m_running(false), - m_fault(false) - { } - - StatsServer(int listen_port) : - m_listenport(listen_port), - m_running(false), - m_fault(false), - m_thread(&StatsServer::serverThread, this) - { - m_sock = 0; - } - - ~StatsServer() - { - m_running = false; - m_fault = false; - if (m_sock) { - close(m_sock); - } - m_thread.join(); - } - - void registerInput(InputStat* is); - void unregisterInput(std::string id); - - bool fault_detected() { return m_fault; } - void restart(void); - - private: - void restart_thread(long); - - /******* TCP Socket Server ******/ - // no copying (because of the thread) - StatsServer(const StatsServer& other); - - void serverThread(void); - - bool isInputRegistered(std::string& id); - - int m_listenport; - - // serverThread runs in a separate thread - bool m_running; - bool m_fault; - boost::thread m_thread; - boost::thread m_restarter_thread; - - int m_sock; - - /******* Statistics Data ********/ - std::map m_inputStats; - - /* Return a description of the configuration that will - * allow to define what graphs to be created - * - * returns: a JSON encoded configuration - */ - std::string getConfigJSON(); - - /* Return the values for the statistics as defined in the configuration - * - * returns: JSON encoded statistics - */ - std::string getValuesJSON(); - - /* Return the state of each input - * - * returns: JSON encoded state - */ - std::string getStateJSON(); - - // mutex for accessing the map - mutable boost::mutex m_mutex; -}; - - -extern StatsServer* global_stats; - -#endif - diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 9dab6c9..9d39945 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -41,7 +41,6 @@ #include "dabInput.h" #include "dabInputZmq.h" -#include "StatsServer.h" #include "PcDebug.h" #ifdef HAVE_CONFIG_H @@ -66,8 +65,6 @@ using namespace std; -extern StatsServer* global_stats; - int readkey(string& keyfile, char* key) { int fd = open(keyfile.c_str(), O_RDONLY); diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index fd6c599..d13029d 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -55,7 +55,7 @@ #include #include "zmq.hpp" #include "dabInput.h" -#include "StatsServer.h" +#include "ManagementServer.h" /* The frame_buffer contains DAB logical frames as defined in * TS 102 563, section 6. -- cgit v1.2.3