diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/InputZeroMQReader.cpp | 73 |
1 files changed, 44 insertions, 29 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 1418db7..783f0f5 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014, 2015 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -42,6 +42,8 @@ #include "PcDebug.h" #include "Utils.h" +using namespace std; + #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, * whose maximal size is 6144. @@ -63,7 +65,10 @@ struct zmq_dab_message_t uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; -int InputZeroMQReader::Open(const std::string& uri, size_t max_queued_frames) +#define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ + (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) + +int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) { // The URL might start with zmq+tcp:// if (uri.substr(0, 4) == "zmq+") { @@ -89,7 +94,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer) return 0; } - std::shared_ptr<std::vector<uint8_t> > incoming; + shared_ptr<vector<uint8_t> > incoming; /* Do some prebuffering because reads will happen in bursts * (4 ETI frames in TM1) and we should make sure that @@ -167,41 +172,51 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } else if (queue_size < workerdata->max_queued_frames) { if (buffer_full) { - etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements"; + etiLog.level(info) << "ZeroMQ buffer recovered: " << + queue_size << " elements"; buffer_full = false; } - zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - - if (dab_msg->version != 1) { - etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; + if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) { + throw runtime_error("ZeroMQ packet too small for header"); } + else { + const zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - int offset = sizeof(dab_msg->version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) - { - if (dab_msg->buflen[i] <= 0 || - dab_msg->buflen[i] > 6144) - { - etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << - dab_msg->buflen[i]; + if (dab_msg->version != 1) { + etiLog.level(error) << + "ZeroMQ wrong packet version " << + dab_msg->version; } - else { - std::shared_ptr<std::vector<uint8_t> > buf = - std::make_shared<std::vector<uint8_t> >(6144, 0x55); - const int framesize = dab_msg->buflen[i]; + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + if (dab_msg->buflen[i] > 6144) { + stringstream ss; + ss << "ZeroMQ buffer " << i << + " has invalid buflen " << dab_msg->buflen[i]; + throw runtime_error(ss.str()); + } + else { + auto buf = make_shared<vector<uint8_t> >(6144, 0x55); + + const int framesize = dab_msg->buflen[i]; + + if ((ssize_t)incoming.size() < offset + framesize) { + throw runtime_error("ZeroMQ packet too small"); + } - memcpy(&buf->front(), - ((uint8_t*)incoming.data()) + offset, - framesize); + memcpy(&buf->front(), + ((uint8_t*)incoming.data()) + offset, + framesize); - offset += framesize; + offset += framesize; - queue_size = workerdata->in_messages->push(buf); - etiLog.log(trace, "ZMQ,push %zu", queue_size); + queue_size = workerdata->in_messages->push(buf); + etiLog.log(trace, "ZMQ,push %zu", queue_size); + } } } } @@ -212,7 +227,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) etiLog.level(warn) << "ZeroMQ buffer overfull !"; buffer_full = true; - throw std::runtime_error("ZMQ input full"); + throw runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); |