aboutsummaryrefslogtreecommitdiffstats
path: root/src/dabInputZmq.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dabInputZmq.cpp')
-rw-r--r--src/dabInputZmq.cpp13
1 files changed, 11 insertions, 2 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index fc975bc..09b4688 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -35,6 +35,7 @@
#include "dabInput.h"
#include "dabInputZmq.h"
#include "dabInputFifo.h"
+#include "StatsServer.h"
#include <stdio.h>
#include <zmq.h>
@@ -49,6 +50,7 @@
#ifdef HAVE_INPUT_ZEROMQ
+extern StatsServer global_stats;
struct dabInputOperations dabInputZmqOperations = {
dabInputZmqInit,
@@ -68,8 +70,6 @@ struct dabInputOperations dabInputZmqOperations = {
int dabInputZmqInit(void** args)
{
dabInputZmqData* input = new dabInputZmqData;
- memset(&input->stats, 0, sizeof(input->stats));
- input->stats.id = dabInputFifoData::nb++;
input->zmq_context = zmq_ctx_new();
if (input->zmq_context == NULL) {
etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno));
@@ -108,11 +108,14 @@ int dabInputZmqOpen(void* args, const char* inputUri)
return 1;
}
+ global_stats.registerInput(uri);
+
input->uri = uri;
return 0;
}
+// size corresponds to a frame size. It is constant for a given bitrate
int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size)
{
int rc;
@@ -123,6 +126,7 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int
rc = dabInputZmqReadFromSocket(input, size);
}
else {
+ global_stats.notifyOverrun(input->uri);
rc = 0;
}
@@ -132,15 +136,20 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int
if (input->prebuffering == 0)
etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
input->uri.c_str());
+ global_stats.notifyUnderrun(input->uri);
memset(buffer, 0, size);
return size;
}
+ // Save stats data in bytes, not in frames
+ global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size);
+
if (input->frame_buffer.empty()) {
etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
input->uri.c_str());
// reset prebuffering
input->prebuffering = INPUT_ZMQ_PREBUFFERING;
+ global_stats.notifyUnderrun(input->uri);
memset(buffer, 0, size);
return size;
}