summaryrefslogtreecommitdiffstats
path: root/src/InputZeroMQReader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r--src/InputZeroMQReader.cpp48
1 files changed, 26 insertions, 22 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index f8c15c4..36d4e4b 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -37,6 +37,7 @@
#include <stdint.h>
#include "zmq.hpp"
#include <boost/thread/thread.hpp>
+#include <boost/make_shared.hpp>
#include "porting.h"
#include "InputReader.h"
#include "PcDebug.h"
@@ -84,16 +85,25 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
{
const size_t framesize = 6144;
- uint8_t* incoming;
- in_messages_.wait_and_pop(incoming);
+ boost::shared_ptr<std::vector<uint8_t> > incoming;
+
+ /* Do some prebuffering because reads will happen in bursts
+ * (4 ETI frames in TM1) and we should make sure that
+ * we can serve the data required for a full transmission frame.
+ */
+ if (in_messages_.size() < 4) {
+ const size_t prebuffering = 10;
+ in_messages_.wait_and_pop(incoming, prebuffering);
+ }
+ else {
+ in_messages_.wait_and_pop(incoming);
+ }
if (! workerdata_.running) {
throw zmq_input_overflow();
}
- memcpy(buffer, incoming, framesize);
-
- delete incoming;
+ memcpy(buffer, &incoming->front(), framesize);
return framesize;
}
@@ -135,16 +145,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
else if (queue_size < workerdata->max_queued_frames) {
if (buffer_full) {
- fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
- queue_size);
+ etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements";
buffer_full = false;
}
zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
if (dab_msg->version != 1) {
- fprintf(stderr, "ZeroMQ input: wrong packet version %d\n",
- dab_msg->version);
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
}
int offset = sizeof(dab_msg->version) +
@@ -155,23 +163,20 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
if (dab_msg->buflen[i] <= 0 ||
dab_msg->buflen[i] > 6144)
{
- fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n",
- i, dab_msg->buflen[i]);
+ etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
+ dab_msg->buflen[i];
// TODO error handling
}
else {
- uint8_t* buf = new uint8_t[6144];
+ boost::shared_ptr<std::vector<uint8_t> > buf =
+ boost::make_shared<std::vector<uint8_t> >(6144, 0x55);
const int framesize = dab_msg->buflen[i];
- memcpy(buf,
+ memcpy(&buf->front(),
((uint8_t*)incoming.data()) + offset,
framesize);
- // pad to 6144 bytes
- memset(&((uint8_t*)buf)[framesize],
- 0x55, 6144 - framesize);
-
offset += framesize;
queue_size = workerdata->in_messages->push(buf);
@@ -182,7 +187,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
workerdata->in_messages->notify();
if (!buffer_full) {
- fprintf(stderr, "ZeroMQ buffer overfull !\n");
+ etiLog.level(warn) << "ZeroMQ buffer overfull !";
buffer_full = true;
throw std::runtime_error("ZMQ input full");
@@ -199,18 +204,17 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
if (queue_size < 5) {
- fprintf(stderr, "ZeroMQ buffer low: %zu elements !\n",
- queue_size);
+ etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !";
}
}
}
catch (zmq::error_t& err) {
- fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
+ etiLog.level(error) << "ZeroMQ error in RecvProcess: '" << err.what() << "'";
}
catch (std::exception& err) {
}
- fprintf(stderr, "ZeroMQ input worker terminated\n");
+ etiLog.level(info) << "ZeroMQ input worker terminated";
subscriber.close();