From 1ada0901a8fa687576fa4953044fd43bc6c06f8a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 22 Feb 2015 17:56:49 +0100 Subject: Move main flowgraph to distinct function --- src/InputZeroMQReader.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 01d8720..5fab447 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -80,6 +80,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) uint8_t* incoming; in_messages_.wait_and_pop(incoming); + if (! workerdata_.running) { + throw std::overflow_error("InputZeroMQ worker dead"); + } + memcpy(buffer, incoming, framesize); delete incoming; @@ -174,6 +178,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) fprintf(stderr, "ZeroMQ buffer overfull !\n"); buffer_full = true; + throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); @@ -195,15 +200,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) catch (zmq::error_t& err) { fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + catch (std::exception& err) { + } fprintf(stderr, "ZeroMQ input worker terminated\n"); subscriber.close(); + + workerdata->running = false; + workerdata->in_messages->notify(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) { running = true; + workerdata->running = true; recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); } -- cgit v1.2.3