diff options
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 75 |
1 files changed, 48 insertions, 27 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index f7f5702..36d4e4b 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014 + Copyright (C) 2013, 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -37,12 +37,11 @@ #include <stdint.h> #include "zmq.hpp" #include <boost/thread/thread.hpp> +#include <boost/make_shared.hpp> #include "porting.h" #include "InputReader.h" #include "PcDebug.h" -#define MAX_QUEUE_SIZE 50 - #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, * whose maximal size is 6144. @@ -64,10 +63,18 @@ struct zmq_dab_message_t uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; -int InputZeroMQReader::Open(std::string uri) +int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames) { - uri_ = uri; + // The URL might start with zmq+tcp:// + if (uri.substr(0, 4) == "zmq+") { + uri_ = uri.substr(4); + } + else { + uri_ = uri; + } + workerdata_.uri = uri; + workerdata_.max_queued_frames = max_queued_frames; // launch receiver thread worker_.Start(&workerdata_); @@ -78,12 +85,25 @@ int InputZeroMQReader::GetNextFrame(void* buffer) { const size_t framesize = 6144; - uint8_t* incoming; - in_messages_.wait_and_pop(incoming); + boost::shared_ptr<std::vector<uint8_t> > incoming; - memcpy(buffer, incoming, framesize); + /* Do some prebuffering because reads will happen in bursts + * (4 ETI frames in TM1) and we should make sure that + * we can serve the data required for a full transmission frame. + */ + if (in_messages_.size() < 4) { + const size_t prebuffering = 10; + in_messages_.wait_and_pop(incoming, prebuffering); + } + else { + in_messages_.wait_and_pop(incoming); + } - delete incoming; + if (! workerdata_.running) { + throw zmq_input_overflow(); + } + + memcpy(buffer, &incoming->front(), framesize); return framesize; } @@ -123,18 +143,16 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } m_to_drop--; } - else if (queue_size < MAX_QUEUE_SIZE) { + else if (queue_size < workerdata->max_queued_frames) { if (buffer_full) { - fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", - queue_size); + etiLog.level(info) << "ZeroMQ buffer recovered: " << queue_size << " elements"; buffer_full = false; } zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); if (dab_msg->version != 1) { - fprintf(stderr, "ZeroMQ input: wrong packet version %d\n", - dab_msg->version); + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; } int offset = sizeof(dab_msg->version) + @@ -145,23 +163,20 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) if (dab_msg->buflen[i] <= 0 || dab_msg->buflen[i] > 6144) { - fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n", - i, dab_msg->buflen[i]); + etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << + dab_msg->buflen[i]; // TODO error handling } else { - uint8_t* buf = new uint8_t[6144]; + boost::shared_ptr<std::vector<uint8_t> > buf = + boost::make_shared<std::vector<uint8_t> >(6144, 0x55); const int framesize = dab_msg->buflen[i]; - memcpy(buf, + memcpy(&buf->front(), ((uint8_t*)incoming.data()) + offset, framesize); - // pad to 6144 bytes - memset(&((uint8_t*)buf)[framesize], - 0x55, 6144 - framesize); - offset += framesize; queue_size = workerdata->in_messages->push(buf); @@ -172,9 +187,10 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) workerdata->in_messages->notify(); if (!buffer_full) { - fprintf(stderr, "ZeroMQ buffer overfull !\n"); + etiLog.level(warn) << "ZeroMQ buffer overfull !"; buffer_full = true; + throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); @@ -188,23 +204,28 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } if (queue_size < 5) { - fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n", - queue_size); + etiLog.level(warn) << "ZeroMQ buffer low: " << queue_size << " elements !"; } } } catch (zmq::error_t& err) { - fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what()); + etiLog.level(error) << "ZeroMQ error in RecvProcess: '" << err.what() << "'"; + } + catch (std::exception& err) { } - fprintf(stderr, "ZeroMQ input worker terminated\n"); + etiLog.level(info) << "ZeroMQ input worker terminated"; subscriber.close(); + + workerdata->running = false; + workerdata->in_messages->notify(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) { running = true; + workerdata->running = true; recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); } |