diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/dabInputZmq.cpp | 66 | ||||
| -rw-r--r-- | src/dabInputZmq.h | 11 | 
2 files changed, 60 insertions, 17 deletions
| diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 09b4688..b560313 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -121,13 +121,39 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int      int rc;      dabInputZmqData* input = (dabInputZmqData*)args; -    // Get new ZMQ messages -    if (input->frame_buffer.size() < INPUT_ZMQ_MAX_BUFFER_SIZE) { -        rc = dabInputZmqReadFromSocket(input, size); -    } -    else { +    /* We must *always* read data from the ZMQ socket, +     * to make sure that ZMQ internal buffers are emptied +     * quickly. It's the only way to control the buffers +     * of the whole path from encoder to our frame_buffer. +     */ +    rc = dabInputZmqReadFromSocket(input, size); + +    /* Notify of a buffer overrun, and drop some frames */ +    if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) {          global_stats.notifyOverrun(input->uri); -        rc = 0; + +        /* If the buffer is really too full, we drop as many frames as needed +         * to get down to the prebuffering size. We would like to have our buffer +         * filled to the prebuffering length. +         */ +        if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { +            size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; + +            while (over_max--) { +                input->frame_buffer.pop_front(); +            } +        } +        else { +            /* Our frame_buffer contains DAB logical frames. Five of these make one +             * AAC superframe. +             * +             * Dropping this superframe amounts to dropping 120ms of audio. */ +            input->frame_buffer.pop_front(); +            input->frame_buffer.pop_front(); +            input->frame_buffer.pop_front(); +            input->frame_buffer.pop_front(); +            input->frame_buffer.pop_front(); +        }      }      if (input->prebuffering > 0) { @@ -136,6 +162,8 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int          if (input->prebuffering == 0)              etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",                  input->uri.c_str()); + +        /* During prebuffering, give a zeroed frame to the mux */          global_stats.notifyUnderrun(input->uri);          memset(buffer, 0, size);          return size; @@ -149,12 +177,16 @@ int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int                  input->uri.c_str());          // reset prebuffering          input->prebuffering = INPUT_ZMQ_PREBUFFERING; + +        /* We have no data to give, we give a zeroed frame */          global_stats.notifyUnderrun(input->uri);          memset(buffer, 0, size);          return size;      }      else      { +        /* Normal situation, give a frame from the frame_buffer */ +          char* newframe = input->frame_buffer.front();          memcpy(buffer, newframe, size);          delete[] newframe; @@ -179,20 +211,24 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)          if (errno != EAGAIN) {              etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno));          } +        zmq_msg_close(&msg);          return 0;      }      char* data = (char*)zmq_msg_data(&msg); -    if (nBytes == 5*framesize) // five frames per superframe +    /* TS 102 563, Section 6: +     * Audio super frames are transported in five successive DAB logical frames +     * with additional error protection. +     */ +    if (nBytes == 5*framesize)      {          if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { -            etiLog.log(warn, "inputZMQ %s input buffer full (%d), dropping frame !\n", -                input->frame_buffer.size(), input->uri.c_str()); +            etiLog.level(warn) << +                "inputZMQ " << input->uri << +                " buffer full (" << input->frame_buffer.size() << ")," +                " dropping incoming superframe !";              nBytes = 0; - -            // we actually drop 2 frames -            input->frame_buffer.pop_front();          }          else {              // copy the input frame blockwise into the frame_buffer @@ -207,8 +243,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)      }      else      { -        etiLog.log(error, "ZMQ Wrong data size: recv'd %d, need %d \n", -                nBytes, 5*framesize); +        etiLog.level(error) << +            "inputZMQ " << input->uri << +            " wrong data size: recv'd" << nBytes << +            ", need " << 5*framesize << ".";          nBytes = 0;      } diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 237209f..56708f9 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -49,11 +49,16 @@  #include "dabInputFifo.h"  #include "StatsServer.h" -// Number of frames to prebuffer -#define INPUT_ZMQ_PREBUFFERING 10 +/* The frame_buffer contains DAB logical frames as defined in + * TS 102 563, section 6. + * Five elements of this buffer make one AAC superframe (120ms audio) + */ + +// Number of elements to prebuffer before starting the pipeline +#define INPUT_ZMQ_PREBUFFERING (5*4) // 480ms  // Maximum frame_buffer size in number of elements -#define INPUT_ZMQ_MAX_BUFFER_SIZE 200 +#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms  extern struct dabInputOperations dabInputZmqOperations; | 
