From 72420013928a2d00855ed8ae9c42ac6e229a0b87 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 19 Jan 2014 18:40:41 +0100 Subject: change zmqInput buffering handling --- src/dabInputZmq.cpp | 66 +++++++++++++++++++++++++++++++++++++++++------------ src/dabInputZmq.h | 11 ++++++--- 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 09b4688..b560313 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -121,13 +121,39 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int int rc; dabInputZmqData* input = (dabInputZmqData*)args; - // Get new ZMQ messages - if (input->frame_buffer.size() < INPUT_ZMQ_MAX_BUFFER_SIZE) { - rc = dabInputZmqReadFromSocket(input, size); - } - else { + /* We must *always* read data from the ZMQ socket, + * to make sure that ZMQ internal buffers are emptied + * quickly. It's the only way to control the buffers + * of the whole path from encoder to our frame_buffer. + */ + rc = dabInputZmqReadFromSocket(input, size); + + /* Notify of a buffer overrun, and drop some frames */ + if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { global_stats.notifyOverrun(input->uri); - rc = 0; + + /* If the buffer is really too full, we drop as many frames as needed + * to get down to the prebuffering size. We would like to have our buffer + * filled to the prebuffering length. + */ + if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { + size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; + + while (over_max--) { + input->frame_buffer.pop_front(); + } + } + else { + /* Our frame_buffer contains DAB logical frames. Five of these make one + * AAC superframe. + * + * Dropping this superframe amounts to dropping 120ms of audio. */ + input->frame_buffer.pop_front(); + input->frame_buffer.pop_front(); + input->frame_buffer.pop_front(); + input->frame_buffer.pop_front(); + input->frame_buffer.pop_front(); + } } if (input->prebuffering > 0) { @@ -136,6 +162,8 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int if (input->prebuffering == 0) etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", input->uri.c_str()); + + /* During prebuffering, give a zeroed frame to the mux */ global_stats.notifyUnderrun(input->uri); memset(buffer, 0, size); return size; @@ -149,12 +177,16 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int input->uri.c_str()); // reset prebuffering input->prebuffering = INPUT_ZMQ_PREBUFFERING; + + /* We have no data to give, we give a zeroed frame */ global_stats.notifyUnderrun(input->uri); memset(buffer, 0, size); return size; } else { + /* Normal situation, give a frame from the frame_buffer */ + char* newframe = input->frame_buffer.front(); memcpy(buffer, newframe, size); delete[] newframe; @@ -179,20 +211,24 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) if (errno != EAGAIN) { etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno)); } + zmq_msg_close(&msg); return 0; } char* data = (char*)zmq_msg_data(&msg); - if (nBytes == 5*framesize) // five frames per superframe + /* TS 102 563, Section 6: + * Audio super frames are transported in five successive DAB logical frames + * with additional error protection. + */ + if (nBytes == 5*framesize) { if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { - etiLog.log(warn, "inputZMQ %s input buffer full (%d), dropping frame !\n", - input->frame_buffer.size(), input->uri.c_str()); + etiLog.level(warn) << + "inputZMQ " << input->uri << + " buffer full (" << input->frame_buffer.size() << ")," + " dropping incoming superframe !"; nBytes = 0; - - // we actually drop 2 frames - input->frame_buffer.pop_front(); } else { // copy the input frame blockwise into the frame_buffer @@ -207,8 +243,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) } else { - etiLog.log(error, "ZMQ Wrong data size: recv'd %d, need %d \n", - nBytes, 5*framesize); + etiLog.level(error) << + "inputZMQ " << input->uri << + " wrong data size: recv'd" << nBytes << + ", need " << 5*framesize << "."; nBytes = 0; } diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 237209f..56708f9 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -49,11 +49,16 @@ #include "dabInputFifo.h" #include "StatsServer.h" -// Number of frames to prebuffer -#define INPUT_ZMQ_PREBUFFERING 10 +/* The frame_buffer contains DAB logical frames as defined in + * TS 102 563, section 6. + * Five elements of this buffer make one AAC superframe (120ms audio) + */ + +// Number of elements to prebuffer before starting the pipeline +#define INPUT_ZMQ_PREBUFFERING (5*4) // 480ms // Maximum frame_buffer size in number of elements -#define INPUT_ZMQ_MAX_BUFFER_SIZE 200 +#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms extern struct dabInputOperations dabInputZmqOperations; -- cgit v1.2.3