aboutsummaryrefslogtreecommitdiffstats
path: root/src/InputZeroMQReader.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-02-22 17:56:49 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-02-22 21:03:35 +0100
commit1ada0901a8fa687576fa4953044fd43bc6c06f8a (patch)
tree923be8c22fc54a25011f84a966b896f850990c69 /src/InputZeroMQReader.cpp
parent7cee56f37001640b88f4ac1249624c9c9758e844 (diff)
downloaddabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.gz
dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.bz2
dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.zip
Move main flowgraph to distinct function
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r--src/InputZeroMQReader.cpp11
1 files changed, 11 insertions, 0 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 01d8720..5fab447 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -80,6 +80,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
uint8_t* incoming;
in_messages_.wait_and_pop(incoming);
+ if (! workerdata_.running) {
+ throw std::overflow_error("InputZeroMQ worker dead");
+ }
+
memcpy(buffer, incoming, framesize);
delete incoming;
@@ -174,6 +178,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
fprintf(stderr, "ZeroMQ buffer overfull !\n");
buffer_full = true;
+ throw std::runtime_error("ZMQ input full");
}
queue_size = workerdata->in_messages->size();
@@ -195,15 +200,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
catch (zmq::error_t& err) {
fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
}
+ catch (std::exception& err) {
+ }
fprintf(stderr, "ZeroMQ input worker terminated\n");
subscriber.close();
+
+ workerdata->running = false;
+ workerdata->in_messages->notify();
}
void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
{
running = true;
+ workerdata->running = true;
recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
}