diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-22 17:56:49 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-22 21:03:35 +0100 |
commit | 1ada0901a8fa687576fa4953044fd43bc6c06f8a (patch) | |
tree | 923be8c22fc54a25011f84a966b896f850990c69 /src/InputZeroMQReader.cpp | |
parent | 7cee56f37001640b88f4ac1249624c9c9758e844 (diff) | |
download | dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.gz dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.bz2 dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.zip |
Move main flowgraph to distinct function
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 11 |
1 files changed, 11 insertions, 0 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 01d8720..5fab447 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -80,6 +80,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) uint8_t* incoming; in_messages_.wait_and_pop(incoming); + if (! workerdata_.running) { + throw std::overflow_error("InputZeroMQ worker dead"); + } + memcpy(buffer, incoming, framesize); delete incoming; @@ -174,6 +178,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) fprintf(stderr, "ZeroMQ buffer overfull !\n"); buffer_full = true; + throw std::runtime_error("ZMQ input full"); } queue_size = workerdata->in_messages->size(); @@ -195,15 +200,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) catch (zmq::error_t& err) { fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what()); } + catch (std::exception& err) { + } fprintf(stderr, "ZeroMQ input worker terminated\n"); subscriber.close(); + + workerdata->running = false; + workerdata->in_messages->notify(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) { running = true; + workerdata->running = true; recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); } |