/* Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org A TCP Socket server that serves state information and statistics for monitoring purposes, and also serves the internal configuration property tree. */ /* This file is part of ODR-DabMux. ODR-DabMux is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ODR-DabMux is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ODR-DabMux. If not, see . */ #include #include #include #include #include #include #include #include #include "ManagementServer.h" #include "Log.h" using namespace std; #define MIN_FILL_BUFFER_UNDEF (-1) /* For silence detection, we count the number of occurrences the audio level * falls below a threshold. * * The counter is decreased for each frame that has good audio level. * * The counter saturates, and this value defines for how long the * input will be considered silent after a cut. * * If the count reaches a certain value, the input changes state * to Silence. */ #define INPUT_AUDIO_LEVEL_THRESHOLD -50 // dB #define INPUT_AUDIO_LEVEL_SILENCE_COUNT 100 // superframes (120ms) #define INPUT_AUDIO_LEVEL_COUNT_SATURATION 500 // superframes (120ms) /* An example of how the state changes work. * The timeout is set to expire in 30 minutes * at each under-/overrun. * * The glitch counter is increased by one for each glitch (can be a * saturating counter), and set to zero when the counter timeout expires. * * The state is then simply depending on the glitch counter value. * * Graphical example: state STREAMING | UNSTABLE | STREAMING xruns U U U glitch counter 0 1 2 3 0 reset timeout \ |\ |\ |\ \ | \ | \ | \ \ | \ | \ | \ \| \ | \| \ ` \| ` \ ` \ \ \ \ \ timeout expires ___________________\ <--30min--> */ /* The delay after which the glitch counter is reset */ static constexpr auto INPUT_COUNTER_RESET_TIME = std::chrono::minutes(30); /* How many glitches we tolerate in Streaming state before * we consider the input Unstable */ static constexpr int INPUT_UNSTABLE_THRESHOLD = 3; /* For how long the input buffers must be empty before we move an input to the * NoData state. */ static constexpr auto INPUT_NODATA_TIMEOUT = std::chrono::seconds(30); /* Keep 30s of min/max buffer fill information so that we can catch meaningful * values even if we have a slow poller */ static constexpr auto BUFFER_STATS_KEEP_DURATION = std::chrono::seconds(30); /* Audio level information changes faster than buffer levels, so it makes sense * to poll much faster. If we take the peak over too much data, we will hide * the interesting short-time fluctuations. At the same time, we want to have a * statistic that also catches the rare peaks, for slow pollers. */ static constexpr auto PEAK_STATS_SHORT_WINDOW = std::chrono::milliseconds(500); static constexpr auto PEAK_STATS_KEEP_DURATION = std::chrono::minutes(5); ManagementServer& get_mgmt_server() { static ManagementServer mgmt_server; return mgmt_server; /* Warning, do not use the mgmt_server in the destructor * of another global object: you don't know which one * gets destroyed first */ } void ManagementServer::registerInput(InputStat* is) { unique_lock lock(m_statsmutex); std::string id(is->get_name()); if (m_inputStats.count(id) == 1) { etiLog.level(error) << "Double registration in MGMT Server with id '" << id << "'"; return; } m_inputStats[id] = is; } void ManagementServer::unregisterInput(std::string id) { unique_lock lock(m_statsmutex); if (m_inputStats.count(id) == 1) { m_inputStats.erase(id); } } bool ManagementServer::isInputRegistered(std::string& id) { unique_lock lock(m_statsmutex); if (m_inputStats.count(id) == 0) { etiLog.level(error) << "Management Server: id '" << id << "' does was not registered"; return false; } return true; } std::string ManagementServer::getStatConfigJSON() { unique_lock lock(m_statsmutex); std::ostringstream ss; ss << "{ \"config\" : [\n"; std::map::iterator iter; int i = 0; for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter, i++) { std::string id = iter->first; if (i > 0) { ss << ", "; } ss << " \"" << id << "\" "; } ss << "] }\n"; return ss.str(); } std::string ManagementServer::getValuesJSON() { unique_lock lock(m_statsmutex); std::ostringstream ss; ss << "{ \"values\" : {\n"; std::map::iterator iter; int i = 0; for(iter = m_inputStats.begin(); iter != m_inputStats.end(); ++iter, i++) { const std::string& id = iter->first; InputStat* stats = iter->second; if (i > 0) { ss << " ,\n"; } ss << " \"" << id << "\" : "; ss << stats->encodeValuesJSON(); } ss << "}\n}\n"; return ss.str(); } ManagementServer::ManagementServer() : m_zmq_context(), m_zmq_sock(m_zmq_context, ZMQ_REP), m_running(false), m_fault(false) { } ManagementServer::~ManagementServer() { m_running = false; if (m_thread.joinable()) { m_thread.join(); m_fault = false; } } void ManagementServer::open(int listenport) { m_listenport = listenport; if (m_listenport > 0) { m_thread = std::thread(&ManagementServer::serverThread, this); } } void ManagementServer::restart() { m_restarter_thread = thread(&ManagementServer::restart_thread, this, 0); } // This runs in a separate thread, because // it would take too long to be done in the main loop // thread. void ManagementServer::restart_thread(long) { m_running = false; if (m_thread.joinable()) { m_thread.join(); m_fault = false; } m_thread = thread(&ManagementServer::serverThread, this); } void ManagementServer::serverThread() { m_running = true; m_fault = false; try { std::string bind_addr = "tcp://127.0.0.1:" + to_string(m_listenport); m_zmq_sock.bind(bind_addr.c_str()); zmq::pollitem_t pollItems[] = { {m_zmq_sock, 0, ZMQ_POLLIN, 0} }; while (m_running) { zmq::poll(pollItems, 1, 1000); if (pollItems[0].revents & ZMQ_POLLIN) { zmq::message_t zmq_message; m_zmq_sock.recv(&zmq_message); handle_message(zmq_message); } } } catch (const exception &e) { etiLog.level(error) << "Exception in ManagementServer: " << e.what(); } m_fault = true; } void ManagementServer::handle_message(zmq::message_t& zmq_message) { std::stringstream answer; std::string data((char*)zmq_message.data(), zmq_message.size()); try { if (data == "info") { answer << "{ " << "\"service\": \"" << PACKAGE_NAME << " " << #if defined(GITVERSION) GITVERSION << #else PACKAGE_VERSION << #endif " MGMT Server\", " << "\"version\": \"" << #if defined(GITVERSION) GITVERSION << #else PACKAGE_VERSION << #endif "\" " << "}\n"; } else if (data == "config") { answer << getStatConfigJSON(); } else if (data == "values") { answer << getValuesJSON(); } else if (data == "getptree") { unique_lock lock(m_configmutex); boost::property_tree::json_parser::write_json(answer, m_pt); } else { etiLog.level(warn) << "ManagementServer: Invalid request '" << data << "'"; answer << "Invalid command"; } std::string answerstr(answer.str()); m_zmq_sock.send(answerstr.c_str(), answerstr.size()); } catch (const std::exception& e) { etiLog.level(error) << "MGMT server caught exception: " << e.what(); } } void ManagementServer::update_ptree(const boost::property_tree::ptree& pt) { if (m_running) { unique_lock lock(m_configmutex); m_pt = pt; } } /************************************************/ InputStat::InputStat(const std::string& name) : m_name(name), m_time_last_event(std::chrono::steady_clock::now()) { } InputStat::~InputStat() { get_mgmt_server().unregisterInput(m_name); } void InputStat::registerAtServer() { get_mgmt_server().registerInput(this); } void InputStat::notifyBuffer(long bufsize) { unique_lock lock(m_mutex); using namespace std::chrono; const auto time_now = steady_clock::now(); m_buffer_fill_stats.push_front({time_now, bufsize}); prune_statistics(time_now); } void InputStat::notifyPeakLevels(int peak_left, int peak_right) { unique_lock lock(m_mutex); using namespace std::chrono; const auto time_now = steady_clock::now(); m_peak_stats.push_front({time_now, peak_left, peak_right}); prune_statistics(time_now); if (m_peak_stats.size() >= 2) { // Calculate the peak over the short window vector short_peaks; copy_if(m_peak_stats.begin(), m_peak_stats.end(), back_inserter(short_peaks), [&](const peak_stat_t& ps) { return ps.timestamp + PEAK_STATS_SHORT_WINDOW >= time_now; }); const auto& short_left_peak_max = max_element( short_peaks.begin(), short_peaks.end(), [](const peak_stat_t& lhs, const peak_stat_t& rhs) { return lhs.peak_left < rhs.peak_left; }); const auto& short_right_peak_max = max_element( short_peaks.begin(), short_peaks.end(), [](const peak_stat_t& lhs, const peak_stat_t& rhs) { return lhs.peak_right < rhs.peak_right; }); // Using the lower of the two channels allows us to detect if only one // channel is silent. const int lower_peak = min( short_left_peak_max->peak_left, short_right_peak_max->peak_right); // State const int16_t int16_max = std::numeric_limits::max(); int peak_dB = lower_peak ? round(20*log10((double)lower_peak / int16_max)) : -90; if (peak_dB < INPUT_AUDIO_LEVEL_THRESHOLD) { if (m_silence_counter < INPUT_AUDIO_LEVEL_COUNT_SATURATION) { m_silence_counter++; } } else { if (m_silence_counter > 0) { m_silence_counter--; } } } } void InputStat::notifyUnderrun(void) { unique_lock lock(m_mutex); // Statistics m_num_underruns++; // State m_time_last_event = std::chrono::steady_clock::now(); if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { m_glitch_counter++; } else { // As we don't receive level notifications anymore, clear the // audio level information m_peak_stats.clear(); } } void InputStat::notifyOverrun(void) { unique_lock lock(m_mutex); // Statistics m_num_overruns++; // State m_time_last_event = std::chrono::steady_clock::now(); if (m_glitch_counter < INPUT_UNSTABLE_THRESHOLD) { m_glitch_counter++; } } void InputStat::notifyVersion(const std::string& version, uint32_t uptime_s) { unique_lock lock(m_mutex); m_version = version; m_uptime_s = uptime_s; } std::string InputStat::encodeValuesJSON() { std::ostringstream ss; const int16_t int16_max = std::numeric_limits::max(); unique_lock lock(m_mutex); int peak_left_short = 0; int peak_right_short = 0; int peak_left = 0; int peak_right = 0; if (not m_peak_stats.empty()) { peak_left = max_element(m_peak_stats.begin(), m_peak_stats.end(), [](const peak_stat_t& lhs, const peak_stat_t& rhs) { return lhs.peak_left < rhs.peak_left; })->peak_left; peak_right = max_element(m_peak_stats.begin(), m_peak_stats.end(), [](const peak_stat_t& lhs, const peak_stat_t& rhs) { return lhs.peak_right < rhs.peak_right; })->peak_right; if (m_peak_stats.size() > m_short_window_length) { peak_left_short = max_element(m_peak_stats.begin(), m_peak_stats.begin() + m_short_window_length, [](const peak_stat_t& lhs, const peak_stat_t& rhs) { return lhs.peak_left < rhs.peak_left; })->peak_left; peak_right_short = max_element(m_peak_stats.begin(), m_peak_stats.begin() + m_short_window_length, [](const peak_stat_t& lhs, const peak_stat_t& rhs) { return lhs.peak_right < rhs.peak_right; })->peak_right; } else { peak_left_short = peak_left; peak_right_short = peak_right; } } long min_fill_buffer = MIN_FILL_BUFFER_UNDEF; long max_fill_buffer = 0; if (not m_buffer_fill_stats.empty()) { const auto& buffer_min_max_fill = minmax_element( m_buffer_fill_stats.begin(), m_buffer_fill_stats.end(), [](const fill_stat_t& lhs, const fill_stat_t& rhs) { return lhs.bufsize < rhs.bufsize; }); min_fill_buffer = buffer_min_max_fill.first->bufsize; max_fill_buffer = buffer_min_max_fill.second->bufsize; } /* convert to dB */ auto to_dB = [](int p) { int dB = -90; if (p) { dB = round(20*log10((double)p / int16_max)); } return dB; }; auto version = m_version; size_t pos = 0; while ((pos = version.find("\"", pos)) != std::string::npos) { version.replace(pos, 1, "\\\""); pos++; } ss << "{ \"inputstat\" : {" "\"min_fill\": " << min_fill_buffer << ", " "\"max_fill\": " << max_fill_buffer << ", " "\"peak_left\": " << to_dB(peak_left_short) << ", " "\"peak_right\": " << to_dB(peak_right_short) << ", " "\"peak_left_slow\": " << to_dB(peak_left) << ", " "\"peak_right_slow\": " << to_dB(peak_right) << ", " "\"num_underruns\": " << m_num_underruns << ", " "\"num_overruns\": " << m_num_overruns << ", " "\"version\": \"" << version << "\", " "\"uptime\": " << m_uptime_s << ", " ; ss << "\"state\": "; switch (determineState()) { case input_state_t::NoData: ss << "\"NoData (1)\""; break; case input_state_t::Unstable: ss << "\"Unstable (2)\""; break; case input_state_t::Silence: ss << "\"Silent (3)\""; break; case input_state_t::Streaming: ss << "\"Streaming (4)\""; break; } ss << " } }"; return ss.str(); } input_state_t InputStat::determineState() { const auto now = std::chrono::steady_clock::now(); prune_statistics(now); input_state_t state; /* if the last event was more that INPUT_COUNTER_RESET_TIME * ago, the timeout has expired. We can reset our * glitch counter. */ if (now - m_time_last_event > INPUT_COUNTER_RESET_TIME) { m_glitch_counter = 0; } // STATE CALCULATION /* If the buffer has been empty for more than * INPUT_NODATA_TIMEOUT, we go to the NoData state. * * Consider an empty deque to be NoData too. */ if (std::all_of( m_buffer_fill_stats.begin(), m_buffer_fill_stats.end(), [](const fill_stat_t& fs) { return fs.bufsize == 0; }) ) { state = input_state_t::NoData; } /* Otherwise, the state depends on the glitch counter */ else if (m_glitch_counter >= INPUT_UNSTABLE_THRESHOLD) { state = input_state_t::Unstable; } else { /* The input is streaming, check if the audio level is too low */ if (m_silence_counter > INPUT_AUDIO_LEVEL_SILENCE_COUNT) { state = input_state_t::Silence; } else { state = input_state_t::Streaming; } } return state; } void InputStat::prune_statistics(const std::chrono::time_point& time_now) { // Keep only stats whose timestamp are more recent than // BUFFER_STATS_KEEP_DURATION ago m_buffer_fill_stats.erase(remove_if( m_buffer_fill_stats.begin(), m_buffer_fill_stats.end(), [&](const fill_stat_t& fs) { return fs.timestamp + BUFFER_STATS_KEEP_DURATION < time_now; }), m_buffer_fill_stats.end()); // Keep only stats whose timestamp are more recent than // BUFFER_STATS_KEEP_DURATION ago m_peak_stats.erase(remove_if( m_peak_stats.begin(), m_peak_stats.end(), [&](const peak_stat_t& ps) { return ps.timestamp + PEAK_STATS_KEEP_DURATION < time_now; }), m_peak_stats.end()); }