From b6071e227819f54943933ed7a4ec0eaa4934ba4e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 26 Mar 2014 18:22:37 +0100 Subject: Change ZMQ input message format --- src/InputReader.h | 4 +-- src/InputZeroMQReader.cpp | 82 ++++++++++++++++++++++++++++++++++++----------- 2 files changed, 65 insertions(+), 21 deletions(-) (limited to 'src') diff --git a/src/InputReader.h b/src/InputReader.h index 6a7d7c3..164c5ac 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -135,7 +135,7 @@ class InputFileReader : public InputReader struct InputZeroMQThreadData { - ThreadsafeQueue *in_messages; + ThreadsafeQueue *in_messages; std::string uri; }; @@ -191,7 +191,7 @@ class InputZeroMQReader : public InputReader std::string uri_; InputZeroMQWorker worker_; - ThreadsafeQueue in_messages_; + ThreadsafeQueue in_messages_; struct InputZeroMQThreadData workerdata_; }; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 52f651b..cf3f7aa 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -41,6 +41,27 @@ #define MAX_QUEUE_SIZE 50 +#define NUM_FRAMES_PER_ZMQ_MESSAGE 4 +/* A concatenation of four ETI frames, + * whose maximal size is 6144. + * + * Four frames in one zmq message are sent, so that + * we do not risk breaking ETI vs. transmission frame + * phase. + * + * The frames are concatenated in buf, and + * their sizes is given in the buflen array. + * + * Most of the time, the buf will not be completely + * filled + */ +struct zmq_dab_message_t +{ + uint32_t version; + uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; + uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; +}; + int InputZeroMQReader::Open(std::string uri) { uri_ = uri; @@ -53,27 +74,16 @@ int InputZeroMQReader::Open(std::string uri) int InputZeroMQReader::GetNextFrame(void* buffer) { - zmq::message_t* incoming; - in_messages_.wait_and_pop(incoming); - - size_t framesize = incoming->size(); + const size_t framesize = 6144; - // guarantee that we never will write more than 6144 bytes - if (framesize > 6144) { - fprintf(stderr, "ZeroMQ message too large: %zu!\n", framesize); - logger_.level(error) << "ZeroMQ message too large" << framesize; - return -1; - } + uint8_t* incoming; + in_messages_.wait_and_pop(incoming); - memcpy(buffer, incoming->data(), framesize); + memcpy(buffer, incoming, framesize); delete incoming; - // pad to 6144 bytes - memset(&((uint8_t*)buffer)[framesize], 0x55, 6144 - framesize); - - - return 6144; + return framesize; } void InputZeroMQReader::PrintInfo() @@ -118,9 +128,43 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) buffer_full = false; } - zmq::message_t* holder = new zmq::message_t(); - holder->move(&incoming); // move the message into the holder - queue_size = workerdata->in_messages->push(holder); + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); + + if (dab_msg->version != 1) { + fprintf(stderr, "ZeroMQ input: wrong packet version %d\n", + dab_msg->version); + } + + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) + { + if (dab_msg->buflen[i] <= 0 || + dab_msg->buflen[i] > 6144) + { + fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n", + i, dab_msg->buflen[i]); + // TODO error handling + } + else { + uint8_t* buf = new uint8_t[6144]; + + const int framesize = dab_msg->buflen[i]; + + memcpy(buf, + ((uint8_t*)incoming.data()) + offset, + framesize); + + // pad to 6144 bytes + memset(&((uint8_t*)buf)[framesize], + 0x55, 6144 - framesize); + + offset += framesize; + + queue_size = workerdata->in_messages->push(buf); + } + } } else { workerdata->in_messages->notify(); -- cgit v1.2.3