diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-22 17:56:49 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-22 21:03:35 +0100 | 
| commit | 1ada0901a8fa687576fa4953044fd43bc6c06f8a (patch) | |
| tree | 923be8c22fc54a25011f84a966b896f850990c69 /src/InputZeroMQReader.cpp | |
| parent | 7cee56f37001640b88f4ac1249624c9c9758e844 (diff) | |
| download | dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.gz dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.bz2 dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.zip | |
Move main flowgraph to distinct function
Diffstat (limited to 'src/InputZeroMQReader.cpp')
| -rw-r--r-- | src/InputZeroMQReader.cpp | 11 | 
1 files changed, 11 insertions, 0 deletions
| diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 01d8720..5fab447 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -80,6 +80,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer)      uint8_t* incoming;      in_messages_.wait_and_pop(incoming); +    if (! workerdata_.running) { +        throw std::overflow_error("InputZeroMQ worker dead"); +    } +      memcpy(buffer, incoming, framesize);      delete incoming; @@ -174,6 +178,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                      fprintf(stderr, "ZeroMQ buffer overfull !\n");                      buffer_full = true; +                    throw std::runtime_error("ZMQ input full");                  }                  queue_size = workerdata->in_messages->size(); @@ -195,15 +200,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)      catch (zmq::error_t& err) {          fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());      } +    catch (std::exception& err) { +    }      fprintf(stderr, "ZeroMQ input worker terminated\n");      subscriber.close(); + +    workerdata->running = false; +    workerdata->in_messages->notify();  }  void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)  {      running = true; +    workerdata->running = true;      recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);  } | 
