diff options
| -rw-r--r-- | README | 2 | ||||
| -rwxr-xr-x | doc/stats_dabmux_multi.py | 116 | ||||
| -rw-r--r-- | src/DabMux.cpp | 5 | ||||
| -rw-r--r-- | src/Makefile.am | 3 | ||||
| -rw-r--r-- | src/TestStatsServer.cpp | 4 | ||||
| -rw-r--r-- | src/dabInputZmq.cpp | 13 | ||||
| -rw-r--r-- | src/dabInputZmq.h | 2 | 
7 files changed, 140 insertions, 5 deletions
@@ -34,6 +34,8 @@ to the official one:  - timestamping support required for SFN  - a ZeroMQ ETI output that can be used with CRC-DABMOD  - a ZeroMQ dabplus input that can be used with fdk-aac-dabplus-zmq +- supports logging to syslog +- supports ZMQ input monitoring with munin tool  The src/ directory contains the source code of CRC-DabMux. diff --git a/doc/stats_dabmux_multi.py b/doc/stats_dabmux_multi.py new file mode 100755 index 0000000..bd6b9b1 --- /dev/null +++ b/doc/stats_dabmux_multi.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python2 +# +# present statistics from dabmux Stats Server +# to munin + +import sys +import json +import socket + +config_template_top = """ +multigraph zmq_inbuf +graph_title {title} +graph_order high low underruns overruns +graph_args --base 1000 +graph_vlabel max/min buffer size bytes during last ${{graph_period}} +graph_category dabmux +graph_info This graph shows the high and low buffer sizes for ZMQ inputs + +high.info Max buffer size +high.label Bytes +high.min 0 +low.info Min buffer size +low.label Bytes +low.min 0 +underruns.info Number of underruns +underruns.label Occurrencies +underruns.min 0 +overruns.info Number of overruns +overruns.label Occurrencies +overruns.min 0 + +""" + +config_template_individual = """ +multigraph zmq_inbuf.id_{ident} + +graph_title Contribution {ident} buffer +graph_order high low +graph_args --base 1000 +graph_vlabel max/min buffer size bytes during last ${{graph_period}} +graph_category dabmux +graph_info This graph shows the high and low buffer sizes for the {ident} ZMQ input + +high.info Max buffer size +high.label Bytes +high.min 0 +high.warning 1: +low.info Min buffer size +low.label Bytes +low.min 0 +low.warning 1: +underruns.info Number of underruns +underruns.label Occurrencies +underruns.min 0 +underruns.warning 1: +overruns.info Number of overruns +overruns.label Occurrencies +overruns.min 0 +overruns.warning 1: +""" + +#TODO enable +#if not os.environ.get("MUNIN_CAP_MULTIGRAPH"): +#    print("This needs munin version 1.4 at least") +#    sys.exit(1) + +def connect(): +    """Create a connection to the dabmux stats server + +    returns: the socket""" + +    sock = socket.socket() +    sock.connect(("localhost", 12720)) + +    version = json.loads(sock.recv(256)) + +    if not version['service'].startswith("CRC-DabMux"): +        sys.stderr.write("Wrong version\n") +        sys.exit(1) + +    return sock + +def get_id_from_uri(uri): +    proto, _, port = uri.partition('://*:') +    return "zmq-" + proto + "-" + port + +if len(sys.argv) == 1: +    sock = connect() +    sock.send("values\n") +    values = json.loads(sock.recv(256))['values'] + +    munin_values = "" +    for ident in values: +        v = values[ident]['inputstat'] +        munin_values += "multigraph zmq_inbuf.id_{ident}\n".format(ident=get_id_from_uri(ident)) +        munin_values += "high.value {}\n".format(v['max_fill']) +        munin_values += "low.value {}\n".format(v['min_fill']) +        munin_values += "underruns.value {}\n".format(v['num_underruns']) +        munin_values += "overruns.value {}\n".format(v['num_overruns']) + +    print(munin_values) + +elif len(sys.argv) == 2 and sys.argv[1] == "config": +    sock = connect() + +    sock.send("config\n") + +    config = json.loads(sock.recv(256)) + +    munin_config = config_template_top.format(title="dabmux ZMQ input buffers") + +    for conf in config['config']: +        munin_config += config_template_individual.format(ident=get_id_from_uri(conf)) + +    print(munin_config) + diff --git a/src/DabMux.cpp b/src/DabMux.cpp index bd8e685..b735cf7 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -117,11 +117,14 @@ typedef DWORD32 uint32_t;  #include "utils.h"  #include "ParserCmdline.h"  #include "ParserConfigfile.h" - +#include "StatsServer.h"  #include "Log.h"  using namespace std; +/* Global stats server */ +StatsServer global_stats(12720); //TODO define port +  static unsigned char Padding_FIB[] = {      0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, diff --git a/src/Makefile.am b/src/Makefile.am index 6dea558..910d5bc 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -38,7 +38,7 @@ ZMQ_LIBS    =  endif  crc_dabmux_CPPFLAGS =-I$(FARSYNC_DIR) $(GITVERSION_FLAGS) -crc_dabmux_LDADD    =$(FEC_LIBS) $(ZMQ_LIBS) -lpthread +crc_dabmux_LDADD    =$(FEC_LIBS) $(ZMQ_LIBS) -lpthread -lboost_thread -lboost_system  crc_dabmux_SOURCES  =DabMux.cpp \                       dabInput.h dabInput.cpp \                       dabInputBridgeUdp.h dabInputBridgeUdp.cpp \ @@ -85,6 +85,7 @@ crc_dabmux_SOURCES  =DabMux.cpp \                       Interleaver.h Interleaver.cpp \                       ReedSolomon.h ReedSolomon.cpp \                       mpeg.h mpeg.c \ +                     StatsServer.h StatsServer.cpp \                       TcpServer.h TcpServer.cpp \                       TcpSocket.h TcpSocket.cpp diff --git a/src/TestStatsServer.cpp b/src/TestStatsServer.cpp index 7ec0342..0d86195 100644 --- a/src/TestStatsServer.cpp +++ b/src/TestStatsServer.cpp @@ -18,6 +18,10 @@ int main(int argc, char **argv)              if (stats_example[i] == 0) {                  serv.notifyUnderrun("foo");              } + +            if (stats_example[i] == 56) { +                serv.notifyOverrun("foo"); +            }          }      } diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index fc975bc..09b4688 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -35,6 +35,7 @@  #include "dabInput.h"  #include "dabInputZmq.h"  #include "dabInputFifo.h" +#include "StatsServer.h"  #include <stdio.h>  #include <zmq.h> @@ -49,6 +50,7 @@  #ifdef HAVE_INPUT_ZEROMQ +extern StatsServer global_stats;  struct dabInputOperations dabInputZmqOperations = {      dabInputZmqInit, @@ -68,8 +70,6 @@ struct dabInputOperations dabInputZmqOperations = {  int dabInputZmqInit(void** args)  {      dabInputZmqData* input = new dabInputZmqData; -    memset(&input->stats, 0, sizeof(input->stats)); -    input->stats.id = dabInputFifoData::nb++;      input->zmq_context = zmq_ctx_new();      if (input->zmq_context == NULL) {          etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno)); @@ -108,11 +108,14 @@ int dabInputZmqOpen(void* args, const char* inputUri)          return 1;      } +    global_stats.registerInput(uri); +      input->uri = uri;      return 0;  } +// size corresponds to a frame size. It is constant for a given bitrate  int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size)  {      int rc; @@ -123,6 +126,7 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int          rc = dabInputZmqReadFromSocket(input, size);      }      else { +        global_stats.notifyOverrun(input->uri);          rc = 0;      } @@ -132,15 +136,20 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int          if (input->prebuffering == 0)              etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",                  input->uri.c_str()); +        global_stats.notifyUnderrun(input->uri);          memset(buffer, 0, size);          return size;      } +    // Save stats data in bytes, not in frames +    global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size); +      if (input->frame_buffer.empty()) {          etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",                  input->uri.c_str());          // reset prebuffering          input->prebuffering = INPUT_ZMQ_PREBUFFERING; +        global_stats.notifyUnderrun(input->uri);          memset(buffer, 0, size);          return size;      } diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 17a4954..237209f 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -47,6 +47,7 @@  #include <string>  #include "dabInput.h"  #include "dabInputFifo.h" +#include "StatsServer.h"  // Number of frames to prebuffer  #define INPUT_ZMQ_PREBUFFERING 10 @@ -61,7 +62,6 @@ struct dabInputZmqData {      void* zmq_context;      void* zmq_sock;      std::list<char*> frame_buffer; //stores elements of type char[<framesize>] -    dabInputFifoStats stats;      int prebuffering;      std::string uri;  };  | 
