aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-01-19 18:40:41 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-01-19 18:40:41 +0100
commit72420013928a2d00855ed8ae9c42ac6e229a0b87 (patch)
tree4a6a513806b838b8929bebba572490f43775a1c2 /src
parentb25cc4973eb70833fa21481168e9291bc852904d (diff)
downloaddabmux-72420013928a2d00855ed8ae9c42ac6e229a0b87.tar.gz
dabmux-72420013928a2d00855ed8ae9c42ac6e229a0b87.tar.bz2
dabmux-72420013928a2d00855ed8ae9c42ac6e229a0b87.zip
change zmqInput buffering handling
Diffstat (limited to 'src')
-rw-r--r--src/dabInputZmq.cpp66
-rw-r--r--src/dabInputZmq.h11
2 files changed, 60 insertions, 17 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index 09b4688..b560313 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -121,13 +121,39 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int
int rc;
dabInputZmqData* input = (dabInputZmqData*)args;
- // Get new ZMQ messages
- if (input->frame_buffer.size() < INPUT_ZMQ_MAX_BUFFER_SIZE) {
- rc = dabInputZmqReadFromSocket(input, size);
- }
- else {
+ /* We must *always* read data from the ZMQ socket,
+ * to make sure that ZMQ internal buffers are emptied
+ * quickly. It's the only way to control the buffers
+ * of the whole path from encoder to our frame_buffer.
+ */
+ rc = dabInputZmqReadFromSocket(input, size);
+
+ /* Notify of a buffer overrun, and drop some frames */
+ if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) {
global_stats.notifyOverrun(input->uri);
- rc = 0;
+
+ /* If the buffer is really too full, we drop as many frames as needed
+ * to get down to the prebuffering size. We would like to have our buffer
+ * filled to the prebuffering length.
+ */
+ if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;
+
+ while (over_max--) {
+ input->frame_buffer.pop_front();
+ }
+ }
+ else {
+ /* Our frame_buffer contains DAB logical frames. Five of these make one
+ * AAC superframe.
+ *
+ * Dropping this superframe amounts to dropping 120ms of audio. */
+ input->frame_buffer.pop_front();
+ input->frame_buffer.pop_front();
+ input->frame_buffer.pop_front();
+ input->frame_buffer.pop_front();
+ input->frame_buffer.pop_front();
+ }
}
if (input->prebuffering > 0) {
@@ -136,6 +162,8 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int
if (input->prebuffering == 0)
etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
input->uri.c_str());
+
+ /* During prebuffering, give a zeroed frame to the mux */
global_stats.notifyUnderrun(input->uri);
memset(buffer, 0, size);
return size;
@@ -149,12 +177,16 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int
input->uri.c_str());
// reset prebuffering
input->prebuffering = INPUT_ZMQ_PREBUFFERING;
+
+ /* We have no data to give, we give a zeroed frame */
global_stats.notifyUnderrun(input->uri);
memset(buffer, 0, size);
return size;
}
else
{
+ /* Normal situation, give a frame from the frame_buffer */
+
char* newframe = input->frame_buffer.front();
memcpy(buffer, newframe, size);
delete[] newframe;
@@ -179,20 +211,24 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)
if (errno != EAGAIN) {
etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno));
}
+ zmq_msg_close(&msg);
return 0;
}
char* data = (char*)zmq_msg_data(&msg);
- if (nBytes == 5*framesize) // five frames per superframe
+ /* TS 102 563, Section 6:
+ * Audio super frames are transported in five successive DAB logical frames
+ * with additional error protection.
+ */
+ if (nBytes == 5*framesize)
{
if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
- etiLog.log(warn, "inputZMQ %s input buffer full (%d), dropping frame !\n",
- input->frame_buffer.size(), input->uri.c_str());
+ etiLog.level(warn) <<
+ "inputZMQ " << input->uri <<
+ " buffer full (" << input->frame_buffer.size() << "),"
+ " dropping incoming superframe !";
nBytes = 0;
-
- // we actually drop 2 frames
- input->frame_buffer.pop_front();
}
else {
// copy the input frame blockwise into the frame_buffer
@@ -207,8 +243,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)
}
else
{
- etiLog.log(error, "ZMQ Wrong data size: recv'd %d, need %d \n",
- nBytes, 5*framesize);
+ etiLog.level(error) <<
+ "inputZMQ " << input->uri <<
+ " wrong data size: recv'd" << nBytes <<
+ ", need " << 5*framesize << ".";
nBytes = 0;
}
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index 237209f..56708f9 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -49,11 +49,16 @@
#include "dabInputFifo.h"
#include "StatsServer.h"
-// Number of frames to prebuffer
-#define INPUT_ZMQ_PREBUFFERING 10
+/* The frame_buffer contains DAB logical frames as defined in
+ * TS 102 563, section 6.
+ * Five elements of this buffer make one AAC superframe (120ms audio)
+ */
+
+// Number of elements to prebuffer before starting the pipeline
+#define INPUT_ZMQ_PREBUFFERING (5*4) // 480ms
// Maximum frame_buffer size in number of elements
-#define INPUT_ZMQ_MAX_BUFFER_SIZE 200
+#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms
extern struct dabInputOperations dabInputZmqOperations;