diff options
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 33 |
1 files changed, 25 insertions, 8 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index cfb56b2..f8c15c4 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2013, 2014 + Copyright (C) 2013, 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -29,7 +29,7 @@ # include "config.h" #endif -#if defined(HAVE_INPUT_ZEROMQ) +#if defined(HAVE_ZEROMQ) #include <string> #include <cstring> @@ -41,8 +41,6 @@ #include "InputReader.h" #include "PcDebug.h" -#define MAX_QUEUE_SIZE 50 - #define NUM_FRAMES_PER_ZMQ_MESSAGE 4 /* A concatenation of four ETI frames, * whose maximal size is 6144. @@ -64,10 +62,18 @@ struct zmq_dab_message_t uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; }; -int InputZeroMQReader::Open(std::string uri) +int InputZeroMQReader::Open(const std::string& uri, unsigned max_queued_frames) { - uri_ = uri; + // The URL might start with zmq+tcp:// + if (uri.substr(0, 4) == "zmq+") { + uri_ = uri.substr(4); + } + else { + uri_ = uri; + } + workerdata_.uri = uri; + workerdata_.max_queued_frames = max_queued_frames; // launch receiver thread worker_.Start(&workerdata_); @@ -81,6 +87,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) uint8_t* incoming; in_messages_.wait_and_pop(incoming); + if (! workerdata_.running) { + throw zmq_input_overflow(); + } + memcpy(buffer, incoming, framesize); delete incoming; @@ -123,7 +133,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } m_to_drop--; } - else if (queue_size < MAX_QUEUE_SIZE) { + else if (queue_size < workerdata->max_queued_frames) { if (buffer_full) { fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n", queue_size); @@ -175,6 +185,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) fprintf(stderr, "ZeroMQ buffer overfull !\n"); buffer_full = true; + throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); @@ -188,7 +199,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) } if (queue_size < 5) { - fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n", + fprintf(stderr, "ZeroMQ buffer low: %zu elements !\n", queue_size); } } @@ -196,15 +207,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) catch (zmq::error_t& err) { fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + catch (std::exception& err) { + } fprintf(stderr, "ZeroMQ input worker terminated\n"); subscriber.close(); + + workerdata->running = false; + workerdata->in_messages->notify(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) { running = true; + workerdata->running = true; recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); } |