summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/InputZeroMQReader.cpp73
1 files changed, 44 insertions, 29 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 1418db7..783f0f5 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2013, 2014, 2015
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -42,6 +42,8 @@
#include "PcDebug.h"
#include "Utils.h"
+using namespace std;
+
#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
/* A concatenation of four ETI frames,
* whose maximal size is 6144.
@@ -63,7 +65,10 @@ struct zmq_dab_message_t
uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
};
-int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames)
+#define ZMQ_DAB_MESSAGE_T_HEADERSIZE \
+ (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t))
+
+int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
{
// The URL might start with zmq+tcp://
if (uri.substr(0, 4) == "zmq+") {
@@ -89,7 +94,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
return 0;
}
- std::shared_ptr<std::vector<uint8_t> > incoming;
+ shared_ptr<vector<uint8_t> > incoming;
/* Do some prebuffering because reads will happen in bursts
* (4 ETI frames in TM1) and we should make sure that
@@ -167,41 +172,51 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
}
else if (queue_size < workerdata->max_queued_frames) {
if (buffer_full) {
- etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements";
+ etiLog.level(info) << "ZeroMQ buffer recovered: " <<
+ queue_size << " elements";
buffer_full = false;
}
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
-
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) {
+ throw runtime_error("ZeroMQ packet too small for header");
}
+ else {
+ const zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
-
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++)
- {
- if (dab_msg->buflen[i] <= 0 ||
- dab_msg->buflen[i] > 6144)
- {
- etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
- dab_msg->buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) <<
+ "ZeroMQ wrong packet version " <<
+ dab_msg->version;
}
- else {
- std::shared_ptr<std::vector<uint8_t> > buf =
- std::make_shared<std::vector<uint8_t> >(6144, 0x55);
- const int framesize = dab_msg->buflen[i];
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] > 6144) {
+ stringstream ss;
+ ss << "ZeroMQ buffer " << i <<
+ " has invalid buflen " << dab_msg->buflen[i];
+ throw runtime_error(ss.str());
+ }
+ else {
+ auto buf = make_shared<vector<uint8_t> >(6144, 0x55);
+
+ const int framesize = dab_msg->buflen[i];
+
+ if ((ssize_t)incoming.size() < offset + framesize) {
+ throw runtime_error("ZeroMQ packet too small");
+ }
- memcpy(&buf->front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ memcpy(&buf->front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- offset += framesize;
+ offset += framesize;
- queue_size = workerdata->in_messages->push(buf);
- etiLog.log(trace, "ZMQ,push %zu", queue_size);
+ queue_size = workerdata->in_messages->push(buf);
+ etiLog.log(trace, "ZMQ,push %zu", queue_size);
+ }
}
}
}
@@ -212,7 +227,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
etiLog.level(warn) << "ZeroMQ buffer overfull !";
buffer_full = true;
- throw std::runtime_error("ZMQ input full");
+ throw runtime_error("ZMQ input full");
}
queue_size = workerdata->in_messages->size();