diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-26 18:22:37 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2014-03-26 18:22:37 +0100 |
commit | b6071e227819f54943933ed7a4ec0eaa4934ba4e (patch) | |
tree | 54d9d22db644f2216a692aa5bc52e7345f61246b /src | |
parent | d43c72adb60395ea20550d49d0310aebbdca53a3 (diff) | |
download | dabmod-b6071e227819f54943933ed7a4ec0eaa4934ba4e.tar.gz dabmod-b6071e227819f54943933ed7a4ec0eaa4934ba4e.tar.bz2 dabmod-b6071e227819f54943933ed7a4ec0eaa4934ba4e.zip |
Change ZMQ input message format
Diffstat (limited to 'src')
-rw-r--r-- | src/InputReader.h | 4 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 82 |
2 files changed, 65 insertions, 21 deletions
diff --git a/src/InputReader.h b/src/InputReader.h index 6a7d7c3..164c5ac 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -135,7 +135,7 @@ class InputFileReader : public InputReader struct InputZeroMQThreadData { - ThreadsafeQueue<zmq::message_t*> *in_messages; + ThreadsafeQueue<uint8_t*> *in_messages; std::string uri; }; @@ -191,7 +191,7 @@ class InputZeroMQReader : public InputReader std::string uri_; InputZeroMQWorker worker_; - ThreadsafeQueue<zmq::message_t*> in_messages_; + ThreadsafeQueue<uint8_t*> in_messages_; struct InputZeroMQThreadData workerdata_; }; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 52f651b..cf3f7aa 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -41,6 +41,27 @@ #define MAX_QUEUE_SIZE 50 +#define NUM_FRAMES_PER_ZMQ_MESSAGE 4 +/* A concatenation of four ETI frames, + * whose maximal size is 6144. + * + * Four frames in one zmq message are sent, so that + * we do not risk breaking ETI vs. transmission frame + * phase. + * + * The frames are concatenated in buf, and + * their sizes is given in the buflen array. + * + * Most of the time, the buf will not be completely + * filled + */ +struct zmq_dab_message_t +{ + uint32_t version; + uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; + uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; +}; + int InputZeroMQReader::Open(std::string uri) { uri_ = uri; @@ -53,27 +74,16 @@ int InputZeroMQReader::Open(std::string uri) int InputZeroMQReader::GetNextFrame(void* buffer) { - zmq::message_t* incoming; - in_messages_.wait_and_pop(incoming); - - size_t framesize = incoming->size(); + const size_t framesize = 6144; - // guarantee that we never will write more than 6144 bytes - if (framesize > 6144) { - fprintf(stderr, "ZeroMQ message too large: %zu!\n", framesize); - logger_.level(error) << "ZeroMQ message too large" << framesize; - return -1; - } + uint8_t* incoming; + in_messages_.wait_and_pop(incoming); - memcpy(buffer, incoming->data(), framesize); + memcpy(buffer, incoming, framesize); delete incoming; - // pad to 6144 bytes - memset(&((uint8_t*)buffer)[framesize], 0x55, 6144 - framesize); - - - return 6144; + return framesize; } void InputZeroMQReader::PrintInfo() @@ -118,9 +128,43 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) buffer_full = false; } - zmq::message_t* holder = new zmq::message_t(); - holder->move(&incoming); // move the message into the holder - queue_size = workerdata->in_messages->push(holder); + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); + + if (dab_msg->version != 1) { + fprintf(stderr, "ZeroMQ input: wrong packet version %d\n", + dab_msg->version); + } + + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) + { + if (dab_msg->buflen[i] <= 0 || + dab_msg->buflen[i] > 6144) + { + fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n", + i, dab_msg->buflen[i]); + // TODO error handling + } + else { + uint8_t* buf = new uint8_t[6144]; + + const int framesize = dab_msg->buflen[i]; + + memcpy(buf, + ((uint8_t*)incoming.data()) + offset, + framesize); + + // pad to 6144 bytes + memset(&((uint8_t*)buf)[framesize], + 0x55, 6144 - framesize); + + offset += framesize; + + queue_size = workerdata->in_messages->push(buf); + } + } } else { workerdata->in_messages->notify(); |