diff options
| -rwxr-xr-x | doc/stats_dabmux_multi.py | 26 | ||||
| -rwxr-xr-x | doc/zmq_remote.py | 48 | ||||
| -rw-r--r-- | src/ManagementServer.cpp | 60 | ||||
| -rw-r--r-- | src/ManagementServer.h | 23 | 
4 files changed, 67 insertions, 90 deletions
| diff --git a/doc/stats_dabmux_multi.py b/doc/stats_dabmux_multi.py index 664f2f1..ff6f0a4 100755 --- a/doc/stats_dabmux_multi.py +++ b/doc/stats_dabmux_multi.py @@ -7,10 +7,13 @@ import sys  import json  import zmq  import os +import re  config_top = """  """ +#default data type is GAUGE +  config_template = """  multigraph buffers_{ident}  graph_title Contribution {ident} buffer @@ -68,6 +71,19 @@ right.min -90  right.max 0  right.warning -40:0  right.critical -80:0 + +multigraph state_{ident} +graph_title State of contribution {ident} +graph_order state +graph_args --base 1000 --lower-limit 0 --upper-limit 5 +graph_vlabel Current state of the input +graph_category dabmux +graph_info This graph shows the state for the {ident} ZMQ input + +state.info Input state +state.label 0 Unknown, 1 NoData, 2 Unstable, 3 Silent, 4 Streaming +state.warning 4:4 +state.critical 2:4  """  ctx = zmq.Context() @@ -93,6 +109,8 @@ def connect():      return sock +re_state = re.compile(r"\w+ (\d+)") +  if len(sys.argv) == 1:      sock = connect()      sock.send("values") @@ -111,6 +129,14 @@ if len(sys.argv) == 1:          munin_values += "left.value {}\n".format(v['peak_left'])          munin_values += "right.value {}\n".format(v['peak_right']) +        if 'state' in v: +            # If ODR-DabMux is v1.3.1-3 or older, it doesn't export state +            match = re_state.match(v['state']) +            if match: +                munin_values += "state.value {}\n".format(match.group(1)) +            else: +                print("Cannot parse state '{}'".format(v['state'])) +      print(munin_values)  elif len(sys.argv) == 2 and sys.argv[1] == "config": diff --git a/doc/zmq_remote.py b/doc/zmq_remote.py index 155390b..47da520 100755 --- a/doc/zmq_remote.py +++ b/doc/zmq_remote.py @@ -12,6 +12,9 @@ context = zmq.Context()  sock = context.socket(zmq.REQ) +poller = zmq.Poller() +poller.register(sock, zmq.POLLIN) +  if len(sys.argv) < 2:      print("Usage: program url cmd [args...]")      sys.exit(1) @@ -22,34 +25,37 @@ message_parts = sys.argv[2:]  # first do a ping test -print("Send ping") -sock.send("ping".encode()) -data = sock.recv_multipart() +print("ping") +sock.send(b"ping") -if len(data) != 1: -    print("Received invalid number of parts: {}".format(len(data))) -    for i,part in enumerate(data): -        print("   {}".format(part)) -    sys.exit(1) +socks = dict(poller.poll(1000)) +if socks: +    if socks.get(sock) == zmq.POLLIN: -if data[0] != b'ok': -    print("Received invalid ping response: {}".format(data.decode())) -    sys.exit(1) +        data = sock.recv_multipart() +        print("Received: {}".format(len(data))) +        for i,part in enumerate(data): +            print("   {}".format(part)) + +        for i, part in enumerate(message_parts): +            if i == len(message_parts) - 1: +                f = 0 +            else: +                f = zmq.SNDMORE -print("Ping ok, sending request '{}'...".format(" ".join(message_parts))) +            print("Send {}({}): '{}'".format(i, f, part)) -for i, part in enumerate(message_parts): -    if i == len(message_parts) - 1: -        f = 0 -    else: -        f = zmq.SNDMORE +            sock.send(part.encode(), flags=f) -    sock.send(part.encode(), flags=f) +        data = sock.recv_multipart() -data = sock.recv_multipart() +        print("Received: {}".format(len(data))) +        for i,part in enumerate(data): +            print(" RX {}: {}".format(i, part)) -print("Received {} entries:".format(len(data))) -print("  " + "  ".join([d.decode() for d in data])) +else: +    print("ZMQ error: timeout") +    context.destroy(linger=5)  # This is free and unencumbered software released into the public domain.  # diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index c44738a..3300c89 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -2,7 +2,7 @@     Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2017     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -146,35 +146,6 @@ std::string ManagementServer::getValuesJSON()      return ss.str();  } -std::string ManagementServer::getStateJSON() -{ -    boost::mutex::scoped_lock lock(m_statsmutex); - -    std::ostringstream ss; -    ss << "{\n"; - -    std::map<std::string,InputStat*>::iterator iter; -    int i = 0; -    for(iter = m_inputStats.begin(); iter != m_inputStats.end(); -            ++iter, i++) -    { -        const std::string& id = iter->first; -        InputStat* stats = iter->second; - -        if (i > 0) { -            ss << " ,\n"; -        } - -        ss << " \"" << id << "\" : "; -        ss << stats->encodeStateJSON(); -        stats->reset(); -    } - -    ss << "}\n"; - -    return ss.str(); -} -  void ManagementServer::restart()  {      m_restarter_thread = boost::thread(&ManagementServer::restart_thread, @@ -244,9 +215,6 @@ void ManagementServer::handle_message(zmq::message_t& zmq_message)          else if (data == "values") {              answer << getValuesJSON();          } -        else if (data == "state") { -            answer << getStateJSON(); -        }          else if (data == "getptree") {              boost::unique_lock<boost::mutex> lock(m_configmutex);              boost::property_tree::json_parser::write_json(answer, m_pt); @@ -305,44 +273,34 @@ std::string InputStat::encodeValuesJSON()          "\"peak_left\": " << dB_l << ", "          "\"peak_right\": " << dB_r << ", "          "\"num_underruns\": " << num_underruns << ", " -        "\"num_overruns\": " << num_overruns << -    " } }"; +        "\"num_overruns\": " << num_overruns << ", "; -    return ss.str(); -} - -std::string InputStat::encodeStateJSON() -{ -    std::ostringstream ss; - -    ss << "{ \"state\" : "; +    ss << "\"state\": ";      switch (determineState()) {          case NoData: -            ss << "\"NoData\""; +            ss << "\"NoData (1)\"";              break;          case Unstable: -            ss << "\"Unstable\""; +            ss << "\"Unstable (2)\"";              break;          case Silence: -            ss << "\"Silent\""; +            ss << "\"Silent (3)\"";              break;          case Streaming: -            ss << "\"Streaming\""; +            ss << "\"Streaming (4)\"";              break;          default: -            ss << "\"Unknown\""; +            ss << "\"Unknown (0)\"";      } -    ss << " }"; +    ss << " } }";      return ss.str();  }  input_state_t InputStat::determineState()  { -    boost::mutex::scoped_lock lock(m_mutex); -      time_t now = time(nullptr);      input_state_t state; diff --git a/src/ManagementServer.h b/src/ManagementServer.h index aade3d0..e75aed3 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -2,7 +2,7 @@     Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2017     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -18,13 +18,11 @@     The responds in JSON, and accepts the commands:      - config      - values -      Inspired by the munin equivalent - -    - state -      Returns the state of each input +      Inspired by the munin equivalent, returns the configuration +      and the statistics values for every exported stat.      - getptree -      Returns the internal boost property_tree that contains the  +      Returns the internal boost property_tree that contains the        multiplexer configuration DB.     The server is using REQ/REP ZeroMQ sockets. @@ -46,8 +44,7 @@     along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>.     */ -#ifndef __MANAGEMENT_SERVER_H -#define __MANAGEMENT_SERVER_H +#pragma once  #ifdef HAVE_CONFIG_H  #   include "config.h" @@ -278,8 +275,6 @@ class InputStat          std::string encodeValuesJSON(void); -        std::string encodeStateJSON(void); -          input_state_t determineState(void);      private: @@ -394,12 +389,6 @@ class ManagementServer           */          std::string getValuesJSON(); -        /* Return the state of each input -         * -         * returns: JSON encoded state -         */ -        std::string getStateJSON(); -          // mutex for accessing the map          boost::mutex m_statsmutex; @@ -412,5 +401,3 @@ class ManagementServer  // a reference to it  ManagementServer& get_mgmt_server(); -#endif - | 
