summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-26 18:22:37 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-03-26 18:22:37 +0100
commitb6071e227819f54943933ed7a4ec0eaa4934ba4e (patch)
tree54d9d22db644f2216a692aa5bc52e7345f61246b
parentd43c72adb60395ea20550d49d0310aebbdca53a3 (diff)
downloaddabmod-b6071e227819f54943933ed7a4ec0eaa4934ba4e.tar.gz
dabmod-b6071e227819f54943933ed7a4ec0eaa4934ba4e.tar.bz2
dabmod-b6071e227819f54943933ed7a4ec0eaa4934ba4e.zip
Change ZMQ input message format
-rw-r--r--src/InputReader.h4
-rw-r--r--src/InputZeroMQReader.cpp82
2 files changed, 65 insertions, 21 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index 6a7d7c3..164c5ac 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -135,7 +135,7 @@ class InputFileReader : public InputReader
struct InputZeroMQThreadData
{
- ThreadsafeQueue<zmq::message_t*> *in_messages;
+ ThreadsafeQueue<uint8_t*> *in_messages;
std::string uri;
};
@@ -191,7 +191,7 @@ class InputZeroMQReader : public InputReader
std::string uri_;
InputZeroMQWorker worker_;
- ThreadsafeQueue<zmq::message_t*> in_messages_;
+ ThreadsafeQueue<uint8_t*> in_messages_;
struct InputZeroMQThreadData workerdata_;
};
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 52f651b..cf3f7aa 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -41,6 +41,27 @@
#define MAX_QUEUE_SIZE 50
+#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
+/* A concatenation of four ETI frames,
+ * whose maximal size is 6144.
+ *
+ * Four frames in one zmq message are sent, so that
+ * we do not risk breaking ETI vs. transmission frame
+ * phase.
+ *
+ * The frames are concatenated in buf, and
+ * their sizes is given in the buflen array.
+ *
+ * Most of the time, the buf will not be completely
+ * filled
+ */
+struct zmq_dab_message_t
+{
+ uint32_t version;
+ uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE];
+ uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
+};
+
int InputZeroMQReader::Open(std::string uri)
{
uri_ = uri;
@@ -53,27 +74,16 @@ int InputZeroMQReader::Open(std::string uri)
int InputZeroMQReader::GetNextFrame(void* buffer)
{
- zmq::message_t* incoming;
- in_messages_.wait_and_pop(incoming);
-
- size_t framesize = incoming->size();
+ const size_t framesize = 6144;
- // guarantee that we never will write more than 6144 bytes
- if (framesize > 6144) {
- fprintf(stderr, "ZeroMQ message too large: %zu!\n", framesize);
- logger_.level(error) << "ZeroMQ message too large" << framesize;
- return -1;
- }
+ uint8_t* incoming;
+ in_messages_.wait_and_pop(incoming);
- memcpy(buffer, incoming->data(), framesize);
+ memcpy(buffer, incoming, framesize);
delete incoming;
- // pad to 6144 bytes
- memset(&((uint8_t*)buffer)[framesize], 0x55, 6144 - framesize);
-
-
- return 6144;
+ return framesize;
}
void InputZeroMQReader::PrintInfo()
@@ -118,9 +128,43 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
buffer_full = false;
}
- zmq::message_t* holder = new zmq::message_t();
- holder->move(&incoming); // move the message into the holder
- queue_size = workerdata->in_messages->push(holder);
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+
+ if (dab_msg->version != 1) {
+ fprintf(stderr, "ZeroMQ input: wrong packet version %d\n",
+ dab_msg->version);
+ }
+
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++)
+ {
+ if (dab_msg->buflen[i] <= 0 ||
+ dab_msg->buflen[i] > 6144)
+ {
+ fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n",
+ i, dab_msg->buflen[i]);
+ // TODO error handling
+ }
+ else {
+ uint8_t* buf = new uint8_t[6144];
+
+ const int framesize = dab_msg->buflen[i];
+
+ memcpy(buf,
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
+
+ // pad to 6144 bytes
+ memset(&((uint8_t*)buf)[framesize],
+ 0x55, 6144 - framesize);
+
+ offset += framesize;
+
+ queue_size = workerdata->in_messages->push(buf);
+ }
+ }
}
else {
workerdata->in_messages->notify();