summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README2
-rwxr-xr-xdoc/stats_dabmux_multi.py116
-rw-r--r--src/DabMux.cpp5
-rw-r--r--src/Makefile.am3
-rw-r--r--src/TestStatsServer.cpp4
-rw-r--r--src/dabInputZmq.cpp13
-rw-r--r--src/dabInputZmq.h2
7 files changed, 140 insertions, 5 deletions
diff --git a/README b/README
index 0dab2e5..30e6f03 100644
--- a/README
+++ b/README
@@ -34,6 +34,8 @@ to the official one:
- timestamping support required for SFN
- a ZeroMQ ETI output that can be used with CRC-DABMOD
- a ZeroMQ dabplus input that can be used with fdk-aac-dabplus-zmq
+- supports logging to syslog
+- supports ZMQ input monitoring with munin tool
The src/ directory contains the source code of CRC-DabMux.
diff --git a/doc/stats_dabmux_multi.py b/doc/stats_dabmux_multi.py
new file mode 100755
index 0000000..bd6b9b1
--- /dev/null
+++ b/doc/stats_dabmux_multi.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python2
+#
+# present statistics from dabmux Stats Server
+# to munin
+
+import sys
+import json
+import socket
+
+config_template_top = """
+multigraph zmq_inbuf
+graph_title {title}
+graph_order high low underruns overruns
+graph_args --base 1000
+graph_vlabel max/min buffer size bytes during last ${{graph_period}}
+graph_category dabmux
+graph_info This graph shows the high and low buffer sizes for ZMQ inputs
+
+high.info Max buffer size
+high.label Bytes
+high.min 0
+low.info Min buffer size
+low.label Bytes
+low.min 0
+underruns.info Number of underruns
+underruns.label Occurrencies
+underruns.min 0
+overruns.info Number of overruns
+overruns.label Occurrencies
+overruns.min 0
+
+"""
+
+config_template_individual = """
+multigraph zmq_inbuf.id_{ident}
+
+graph_title Contribution {ident} buffer
+graph_order high low
+graph_args --base 1000
+graph_vlabel max/min buffer size bytes during last ${{graph_period}}
+graph_category dabmux
+graph_info This graph shows the high and low buffer sizes for the {ident} ZMQ input
+
+high.info Max buffer size
+high.label Bytes
+high.min 0
+high.warning 1:
+low.info Min buffer size
+low.label Bytes
+low.min 0
+low.warning 1:
+underruns.info Number of underruns
+underruns.label Occurrencies
+underruns.min 0
+underruns.warning 1:
+overruns.info Number of overruns
+overruns.label Occurrencies
+overruns.min 0
+overruns.warning 1:
+"""
+
+#TODO enable
+#if not os.environ.get("MUNIN_CAP_MULTIGRAPH"):
+# print("This needs munin version 1.4 at least")
+# sys.exit(1)
+
+def connect():
+ """Create a connection to the dabmux stats server
+
+ returns: the socket"""
+
+ sock = socket.socket()
+ sock.connect(("localhost", 12720))
+
+ version = json.loads(sock.recv(256))
+
+ if not version['service'].startswith("CRC-DabMux"):
+ sys.stderr.write("Wrong version\n")
+ sys.exit(1)
+
+ return sock
+
+def get_id_from_uri(uri):
+ proto, _, port = uri.partition('://*:')
+ return "zmq-" + proto + "-" + port
+
+if len(sys.argv) == 1:
+ sock = connect()
+ sock.send("values\n")
+ values = json.loads(sock.recv(256))['values']
+
+ munin_values = ""
+ for ident in values:
+ v = values[ident]['inputstat']
+ munin_values += "multigraph zmq_inbuf.id_{ident}\n".format(ident=get_id_from_uri(ident))
+ munin_values += "high.value {}\n".format(v['max_fill'])
+ munin_values += "low.value {}\n".format(v['min_fill'])
+ munin_values += "underruns.value {}\n".format(v['num_underruns'])
+ munin_values += "overruns.value {}\n".format(v['num_overruns'])
+
+ print(munin_values)
+
+elif len(sys.argv) == 2 and sys.argv[1] == "config":
+ sock = connect()
+
+ sock.send("config\n")
+
+ config = json.loads(sock.recv(256))
+
+ munin_config = config_template_top.format(title="dabmux ZMQ input buffers")
+
+ for conf in config['config']:
+ munin_config += config_template_individual.format(ident=get_id_from_uri(conf))
+
+ print(munin_config)
+
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index bd8e685..b735cf7 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -117,11 +117,14 @@ typedef DWORD32 uint32_t;
#include "utils.h"
#include "ParserCmdline.h"
#include "ParserConfigfile.h"
-
+#include "StatsServer.h"
#include "Log.h"
using namespace std;
+/* Global stats server */
+StatsServer global_stats(12720); //TODO define port
+
static unsigned char Padding_FIB[] = {
0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
diff --git a/src/Makefile.am b/src/Makefile.am
index 6dea558..910d5bc 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -38,7 +38,7 @@ ZMQ_LIBS =
endif
crc_dabmux_CPPFLAGS =-I$(FARSYNC_DIR) $(GITVERSION_FLAGS)
-crc_dabmux_LDADD =$(FEC_LIBS) $(ZMQ_LIBS) -lpthread
+crc_dabmux_LDADD =$(FEC_LIBS) $(ZMQ_LIBS) -lpthread -lboost_thread -lboost_system
crc_dabmux_SOURCES =DabMux.cpp \
dabInput.h dabInput.cpp \
dabInputBridgeUdp.h dabInputBridgeUdp.cpp \
@@ -85,6 +85,7 @@ crc_dabmux_SOURCES =DabMux.cpp \
Interleaver.h Interleaver.cpp \
ReedSolomon.h ReedSolomon.cpp \
mpeg.h mpeg.c \
+ StatsServer.h StatsServer.cpp \
TcpServer.h TcpServer.cpp \
TcpSocket.h TcpSocket.cpp
diff --git a/src/TestStatsServer.cpp b/src/TestStatsServer.cpp
index 7ec0342..0d86195 100644
--- a/src/TestStatsServer.cpp
+++ b/src/TestStatsServer.cpp
@@ -18,6 +18,10 @@ int main(int argc, char **argv)
if (stats_example[i] == 0) {
serv.notifyUnderrun("foo");
}
+
+ if (stats_example[i] == 56) {
+ serv.notifyOverrun("foo");
+ }
}
}
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index fc975bc..09b4688 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -35,6 +35,7 @@
#include "dabInput.h"
#include "dabInputZmq.h"
#include "dabInputFifo.h"
+#include "StatsServer.h"
#include <stdio.h>
#include <zmq.h>
@@ -49,6 +50,7 @@
#ifdef HAVE_INPUT_ZEROMQ
+extern StatsServer global_stats;
struct dabInputOperations dabInputZmqOperations = {
dabInputZmqInit,
@@ -68,8 +70,6 @@ struct dabInputOperations dabInputZmqOperations = {
int dabInputZmqInit(void** args)
{
dabInputZmqData* input = new dabInputZmqData;
- memset(&input->stats, 0, sizeof(input->stats));
- input->stats.id = dabInputFifoData::nb++;
input->zmq_context = zmq_ctx_new();
if (input->zmq_context == NULL) {
etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno));
@@ -108,11 +108,14 @@ int dabInputZmqOpen(void* args, const char* inputUri)
return 1;
}
+ global_stats.registerInput(uri);
+
input->uri = uri;
return 0;
}
+// size corresponds to a frame size. It is constant for a given bitrate
int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size)
{
int rc;
@@ -123,6 +126,7 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int
rc = dabInputZmqReadFromSocket(input, size);
}
else {
+ global_stats.notifyOverrun(input->uri);
rc = 0;
}
@@ -132,15 +136,20 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int
if (input->prebuffering == 0)
etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
input->uri.c_str());
+ global_stats.notifyUnderrun(input->uri);
memset(buffer, 0, size);
return size;
}
+ // Save stats data in bytes, not in frames
+ global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size);
+
if (input->frame_buffer.empty()) {
etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
input->uri.c_str());
// reset prebuffering
input->prebuffering = INPUT_ZMQ_PREBUFFERING;
+ global_stats.notifyUnderrun(input->uri);
memset(buffer, 0, size);
return size;
}
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index 17a4954..237209f 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -47,6 +47,7 @@
#include <string>
#include "dabInput.h"
#include "dabInputFifo.h"
+#include "StatsServer.h"
// Number of frames to prebuffer
#define INPUT_ZMQ_PREBUFFERING 10
@@ -61,7 +62,6 @@ struct dabInputZmqData {
void* zmq_context;
void* zmq_sock;
std::list<char*> frame_buffer; //stores elements of type char[<framesize>]
- dabInputFifoStats stats;
int prebuffering;
std::string uri;
};