aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/dabInputZmq.cpp69
-rw-r--r--src/dabInputZmq.h30
2 files changed, 75 insertions, 24 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index 4b04b75..2a18588 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -329,7 +329,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size)
else
{
/* Normal situation, give a frame from the frame_buffer */
- char* newframe = m_frame_buffer.front();
+ uint8_t* newframe = m_frame_buffer.front();
memcpy(buffer, newframe, size);
delete[] newframe;
m_frame_buffer.pop_front();
@@ -372,7 +372,7 @@ int DabInputZmqMPEG::readFromSocket(size_t framesize)
}
else if (m_enable_input) {
// copy the input frame blockwise into the frame_buffer
- char* frame = new char[framesize];
+ uint8_t* frame = new uint8_t[framesize];
memcpy(frame, data, framesize);
m_frame_buffer.push_back(frame);
}
@@ -413,43 +413,66 @@ int DabInputZmqAAC::readFromSocket(size_t framesize)
m_name << ": " << err.what();
}
- char* data = (char*)msg.data();
+ /* This is the old 'one superframe per ZMQ message' format */
+ uint8_t* data = (uint8_t*)msg.data();
+ size_t datalen = msg.size();
+
+ /* Look for the new zmq_frame_header_t format */
+ zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data();
+
+ if (msg.size() == ZMQ_FRAME_SIZE(frame) &&
+ frame->version == 1 &&
+ frame->encoder == ZMQ_ENCODER_FDK) {
+ datalen = frame->datasize;
+ data = ZMQ_FRAME_DATA(frame);
+ }
+
/* TS 102 563, Section 6:
* Audio super frames are transported in five successive DAB logical frames
* with additional error protection.
*/
- if (msg.size() == 5*framesize)
+ if (datalen)
{
- if (m_frame_buffer.size() > m_config.buffer_size) {
- etiLog.level(warn) <<
- "inputZMQ " << m_name <<
- " buffer full (" << m_frame_buffer.size() << "),"
- " dropping incoming superframe !";
- messageReceived = 0;
- }
- else if (m_enable_input) {
- // copy the input frame blockwise into the frame_buffer
- for (char* framestart = data;
- framestart < &data[5*framesize];
- framestart += framesize) {
- char* frame = new char[framesize];
- memcpy(frame, framestart, framesize);
- m_frame_buffer.push_back(frame);
+ if (datalen == 5*framesize)
+ {
+ if (m_frame_buffer.size() > m_config.buffer_size) {
+ etiLog.level(warn) <<
+ "inputZMQ " << m_name <<
+ " buffer full (" << m_frame_buffer.size() << "),"
+ " dropping incoming superframe !";
+ messageReceived = 0;
+ }
+ else if (m_enable_input) {
+ // copy the input frame blockwise into the frame_buffer
+ for (uint8_t* framestart = data;
+ framestart < &data[5*framesize];
+ framestart += framesize) {
+ uint8_t* audioframe = new uint8_t[framesize];
+ memcpy(audioframe, framestart, framesize);
+ m_frame_buffer.push_back(audioframe);
+ }
+ }
+ else {
+ datalen = 0;
}
}
else {
- return 0;
+ etiLog.level(error) <<
+ "inputZMQ " << m_name <<
+ " wrong data size: recv'd " << msg.size() <<
+ ", need " << 5*framesize << ".";
+
+ datalen = 0;
}
}
else {
etiLog.level(error) <<
"inputZMQ " << m_name <<
- " wrong data size: recv'd " << msg.size() <<
- ", need " << 5*framesize << ".";
+ " invalid frame received";
}
- return msg.size();
+ return datalen;
}
/********* REMOTE CONTROL ***********/
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index b74b1e8..3ef7233 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -51,6 +51,7 @@
#include <list>
#include <string>
+#include <stdint.h>
#include "zmq.hpp"
#include "dabInput.h"
#include "StatsServer.h"
@@ -127,6 +128,33 @@ struct dab_input_zmq_config_t
std::string curve_encoder_keyfile;
};
+#define ZMQ_ENCODER_FDK 1
+#define ZMQ_ENCODER_TOOLAME 2
+
+/* This defines the on-wire representation of a ZMQ message header.
+ *
+ * The data follows right after this header */
+struct zmq_frame_header_t
+{
+ uint16_t version; // we support version=1 now
+ uint16_t encoder; // see ZMQ_ENCODER_XYZ
+
+ /* length of the 'data' field */
+ uint32_t datasize;
+
+ /* Audio level, peak, linear PCM */
+ int16_t audiolevel_left;
+ int16_t audiolevel_right;
+
+ /* Data follows this header */
+} __attribute__ ((packed));
+
+/* The expected frame size incl data of the given frame */
+#define ZMQ_FRAME_SIZE(f) (sizeof(zmq_frame_header_t) + f->datasize)
+
+#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(zmq_frame_header_t) )
+
+
class DabInputZmqBase : public DabInputBase, public RemoteControllable {
public:
DabInputZmqBase(const std::string name,
@@ -191,7 +219,7 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {
bool m_enable_input;
/* stores elements of type char[<superframesize>] */
- std::list<char*> m_frame_buffer;
+ std::list<uint8_t*> m_frame_buffer;
dab_input_zmq_config_t m_config;