From e79bbb00757cb6e7a415aa613d52253b12ca26a8 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 25 Apr 2014 11:00:29 +0200 Subject: Add support for new zmq frame format --- src/dabInputZmq.cpp | 69 +++++++++++++++++++++++++++++++++++------------------ src/dabInputZmq.h | 30 ++++++++++++++++++++++- 2 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 4b04b75..2a18588 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -329,7 +329,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size) else { /* Normal situation, give a frame from the frame_buffer */ - char* newframe = m_frame_buffer.front(); + uint8_t* newframe = m_frame_buffer.front(); memcpy(buffer, newframe, size); delete[] newframe; m_frame_buffer.pop_front(); @@ -372,7 +372,7 @@ int DabInputZmqMPEG::readFromSocket(size_t framesize) } else if (m_enable_input) { // copy the input frame blockwise into the frame_buffer - char* frame = new char[framesize]; + uint8_t* frame = new uint8_t[framesize]; memcpy(frame, data, framesize); m_frame_buffer.push_back(frame); } @@ -413,43 +413,66 @@ int DabInputZmqAAC::readFromSocket(size_t framesize) m_name << ": " << err.what(); } - char* data = (char*)msg.data(); + /* This is the old 'one superframe per ZMQ message' format */ + uint8_t* data = (uint8_t*)msg.data(); + size_t datalen = msg.size(); + + /* Look for the new zmq_frame_header_t format */ + zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); + + if (msg.size() == ZMQ_FRAME_SIZE(frame) && + frame->version == 1 && + frame->encoder == ZMQ_ENCODER_FDK) { + datalen = frame->datasize; + data = ZMQ_FRAME_DATA(frame); + } + /* TS 102 563, Section 6: * Audio super frames are transported in five successive DAB logical frames * with additional error protection. */ - if (msg.size() == 5*framesize) + if (datalen) { - if (m_frame_buffer.size() > m_config.buffer_size) { - etiLog.level(warn) << - "inputZMQ " << m_name << - " buffer full (" << m_frame_buffer.size() << ")," - " dropping incoming superframe !"; - messageReceived = 0; - } - else if (m_enable_input) { - // copy the input frame blockwise into the frame_buffer - for (char* framestart = data; - framestart < &data[5*framesize]; - framestart += framesize) { - char* frame = new char[framesize]; - memcpy(frame, framestart, framesize); - m_frame_buffer.push_back(frame); + if (datalen == 5*framesize) + { + if (m_frame_buffer.size() > m_config.buffer_size) { + etiLog.level(warn) << + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," + " dropping incoming superframe !"; + messageReceived = 0; + } + else if (m_enable_input) { + // copy the input frame blockwise into the frame_buffer + for (uint8_t* framestart = data; + framestart < &data[5*framesize]; + framestart += framesize) { + uint8_t* audioframe = new uint8_t[framesize]; + memcpy(audioframe, framestart, framesize); + m_frame_buffer.push_back(audioframe); + } + } + else { + datalen = 0; } } else { - return 0; + etiLog.level(error) << + "inputZMQ " << m_name << + " wrong data size: recv'd " << msg.size() << + ", need " << 5*framesize << "."; + + datalen = 0; } } else { etiLog.level(error) << "inputZMQ " << m_name << - " wrong data size: recv'd " << msg.size() << - ", need " << 5*framesize << "."; + " invalid frame received"; } - return msg.size(); + return datalen; } /********* REMOTE CONTROL ***********/ diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index b74b1e8..3ef7233 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -51,6 +51,7 @@ #include #include +#include #include "zmq.hpp" #include "dabInput.h" #include "StatsServer.h" @@ -127,6 +128,33 @@ struct dab_input_zmq_config_t std::string curve_encoder_keyfile; }; +#define ZMQ_ENCODER_FDK 1 +#define ZMQ_ENCODER_TOOLAME 2 + +/* This defines the on-wire representation of a ZMQ message header. + * + * The data follows right after this header */ +struct zmq_frame_header_t +{ + uint16_t version; // we support version=1 now + uint16_t encoder; // see ZMQ_ENCODER_XYZ + + /* length of the 'data' field */ + uint32_t datasize; + + /* Audio level, peak, linear PCM */ + int16_t audiolevel_left; + int16_t audiolevel_right; + + /* Data follows this header */ +} __attribute__ ((packed)); + +/* The expected frame size incl data of the given frame */ +#define ZMQ_FRAME_SIZE(f) (sizeof(zmq_frame_header_t) + f->datasize) + +#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(zmq_frame_header_t) ) + + class DabInputZmqBase : public DabInputBase, public RemoteControllable { public: DabInputZmqBase(const std::string name, @@ -191,7 +219,7 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { bool m_enable_input; /* stores elements of type char[] */ - std::list m_frame_buffer; + std::list m_frame_buffer; dab_input_zmq_config_t m_config; -- cgit v1.2.3