diff options
-rwxr-xr-x | doc/stats_dabmux_multi.py | 26 | ||||
-rwxr-xr-x | doc/zmq_remote.py | 48 | ||||
-rw-r--r-- | src/ManagementServer.cpp | 60 | ||||
-rw-r--r-- | src/ManagementServer.h | 23 |
4 files changed, 67 insertions, 90 deletions
diff --git a/doc/stats_dabmux_multi.py b/doc/stats_dabmux_multi.py index 664f2f1..ff6f0a4 100755 --- a/doc/stats_dabmux_multi.py +++ b/doc/stats_dabmux_multi.py @@ -7,10 +7,13 @@ import sys import json import zmq import os +import re config_top = """ """ +#default data type is GAUGE + config_template = """ multigraph buffers_{ident} graph_title Contribution {ident} buffer @@ -68,6 +71,19 @@ right.min -90 right.max 0 right.warning -40:0 right.critical -80:0 + +multigraph state_{ident} +graph_title State of contribution {ident} +graph_order state +graph_args --base 1000 --lower-limit 0 --upper-limit 5 +graph_vlabel Current state of the input +graph_category dabmux +graph_info This graph shows the state for the {ident} ZMQ input + +state.info Input state +state.label 0 Unknown, 1 NoData, 2 Unstable, 3 Silent, 4 Streaming +state.warning 4:4 +state.critical 2:4 """ ctx = zmq.Context() @@ -93,6 +109,8 @@ def connect(): return sock +re_state = re.compile(r"\w+ (\d+)") + if len(sys.argv) == 1: sock = connect() sock.send("values") @@ -111,6 +129,14 @@ if len(sys.argv) == 1: munin_values += "left.value {}\n".format(v['peak_left']) munin_values += "right.value {}\n".format(v['peak_right']) + if 'state' in v: + # If ODR-DabMux is v1.3.1-3 or older, it doesn't export state + match = re_state.match(v['state']) + if match: + munin_values += "state.value {}\n".format(match.group(1)) + else: + print("Cannot parse state '{}'".format(v['state'])) + print(munin_values) elif len(sys.argv) == 2 and sys.argv[1] == "config": diff --git a/doc/zmq_remote.py b/doc/zmq_remote.py index 155390b..47da520 100755 --- a/doc/zmq_remote.py +++ b/doc/zmq_remote.py @@ -12,6 +12,9 @@ context = zmq.Context() sock = context.socket(zmq.REQ) +poller = zmq.Poller() +poller.register(sock, zmq.POLLIN) + if len(sys.argv) < 2: print("Usage: program url cmd [args...]") sys.exit(1) @@ -22,34 +25,37 @@ message_parts = sys.argv[2:] # first do a ping test -print("Send ping") -sock.send("ping".encode()) -data = sock.recv_multipart() +print("ping") +sock.send(b"ping") -if len(data) != 1: - print("Received invalid number of parts: {}".format(len(data))) - for i,part in enumerate(data): - print(" {}".format(part)) - sys.exit(1) +socks = dict(poller.poll(1000)) +if socks: + if socks.get(sock) == zmq.POLLIN: -if data[0] != b'ok': - print("Received invalid ping response: {}".format(data.decode())) - sys.exit(1) + data = sock.recv_multipart() + print("Received: {}".format(len(data))) + for i,part in enumerate(data): + print(" {}".format(part)) + + for i, part in enumerate(message_parts): + if i == len(message_parts) - 1: + f = 0 + else: + f = zmq.SNDMORE -print("Ping ok, sending request '{}'...".format(" ".join(message_parts))) + print("Send {}({}): '{}'".format(i, f, part)) -for i, part in enumerate(message_parts): - if i == len(message_parts) - 1: - f = 0 - else: - f = zmq.SNDMORE + sock.send(part.encode(), flags=f) - sock.send(part.encode(), flags=f) + data = sock.recv_multipart() -data = sock.recv_multipart() + print("Received: {}".format(len(data))) + for i,part in enumerate(data): + print(" RX {}: {}".format(i, part)) -print("Received {} entries:".format(len(data))) -print(" " + " ".join([d.decode() for d in data])) +else: + print("ZMQ error: timeout") + context.destroy(linger=5) # This is free and unencumbered software released into the public domain. # diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index c44738a..3300c89 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -146,35 +146,6 @@ std::string ManagementServer::getValuesJSON() return ss.str(); } -std::string ManagementServer::getStateJSON() -{ - boost::mutex::scoped_lock lock(m_statsmutex); - - std::ostringstream ss; - ss << "{\n"; - - std::map<std::string,InputStat*>::iterator iter; - int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); - ++iter, i++) - { - const std::string& id = iter->first; - InputStat* stats = iter->second; - - if (i > 0) { - ss << " ,\n"; - } - - ss << " \"" << id << "\" : "; - ss << stats->encodeStateJSON(); - stats->reset(); - } - - ss << "}\n"; - - return ss.str(); -} - void ManagementServer::restart() { m_restarter_thread = boost::thread(&ManagementServer::restart_thread, @@ -244,9 +215,6 @@ void ManagementServer::handle_message(zmq::message_t& zmq_message) else if (data == "values") { answer << getValuesJSON(); } - else if (data == "state") { - answer << getStateJSON(); - } else if (data == "getptree") { boost::unique_lock<boost::mutex> lock(m_configmutex); boost::property_tree::json_parser::write_json(answer, m_pt); @@ -305,44 +273,34 @@ std::string InputStat::encodeValuesJSON() "\"peak_left\": " << dB_l << ", " "\"peak_right\": " << dB_r << ", " "\"num_underruns\": " << num_underruns << ", " - "\"num_overruns\": " << num_overruns << - " } }"; + "\"num_overruns\": " << num_overruns << ", "; - return ss.str(); -} - -std::string InputStat::encodeStateJSON() -{ - std::ostringstream ss; - - ss << "{ \"state\" : "; + ss << "\"state\": "; switch (determineState()) { case NoData: - ss << "\"NoData\""; + ss << "\"NoData (1)\""; break; case Unstable: - ss << "\"Unstable\""; + ss << "\"Unstable (2)\""; break; case Silence: - ss << "\"Silent\""; + ss << "\"Silent (3)\""; break; case Streaming: - ss << "\"Streaming\""; + ss << "\"Streaming (4)\""; break; default: - ss << "\"Unknown\""; + ss << "\"Unknown (0)\""; } - ss << " }"; + ss << " } }"; return ss.str(); } input_state_t InputStat::determineState() { - boost::mutex::scoped_lock lock(m_mutex); - time_t now = time(nullptr); input_state_t state; diff --git a/src/ManagementServer.h b/src/ManagementServer.h index aade3d0..e75aed3 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -18,13 +18,11 @@ The responds in JSON, and accepts the commands: - config - values - Inspired by the munin equivalent - - - state - Returns the state of each input + Inspired by the munin equivalent, returns the configuration + and the statistics values for every exported stat. - getptree - Returns the internal boost property_tree that contains the + Returns the internal boost property_tree that contains the multiplexer configuration DB. The server is using REQ/REP ZeroMQ sockets. @@ -46,8 +44,7 @@ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __MANAGEMENT_SERVER_H -#define __MANAGEMENT_SERVER_H +#pragma once #ifdef HAVE_CONFIG_H # include "config.h" @@ -278,8 +275,6 @@ class InputStat std::string encodeValuesJSON(void); - std::string encodeStateJSON(void); - input_state_t determineState(void); private: @@ -394,12 +389,6 @@ class ManagementServer */ std::string getValuesJSON(); - /* Return the state of each input - * - * returns: JSON encoded state - */ - std::string getStateJSON(); - // mutex for accessing the map boost::mutex m_statsmutex; @@ -412,5 +401,3 @@ class ManagementServer // a reference to it ManagementServer& get_mgmt_server(); -#endif - |