diff options
-rw-r--r-- | src/InputReader.h | 8 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 9 |
2 files changed, 10 insertions, 7 deletions
diff --git a/src/InputReader.h b/src/InputReader.h index b8c4fae..d2b5d8c 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -150,8 +150,6 @@ struct InputZeroMQThreadData ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > *in_messages; std::string uri; unsigned max_queued_frames; - - bool running; }; class InputZeroMQWorker @@ -164,10 +162,13 @@ class InputZeroMQWorker void Start(struct InputZeroMQThreadData* workerdata); void Stop(); + + bool is_running(void) { return running; } private: + bool running; + void RecvProcess(struct InputZeroMQThreadData* workerdata); - bool running; zmq::context_t zmqcontext; // is thread-safe boost::thread recv_thread; @@ -186,7 +187,6 @@ class InputZeroMQReader : public InputReader InputZeroMQReader() { workerdata_.in_messages = &in_messages_; - workerdata_.running = false; } ~InputZeroMQReader() diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 70b0afc..58d76c9 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -84,6 +84,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) { const size_t framesize = 6144; + if (not worker_.is_running()) { + return 0; + } + std::shared_ptr<std::vector<uint8_t> > incoming; /* Do some prebuffering because reads will happen in bursts @@ -98,7 +102,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer) in_messages_.wait_and_pop(incoming); } - if (! workerdata_.running) { + if (not worker_.is_running()) { throw zmq_input_overflow(); } @@ -217,14 +221,13 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) subscriber.close(); - workerdata->running = false; + running = false; workerdata->in_messages->notify(); } void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) { running = true; - workerdata->running = true; recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); } |