diff options
Diffstat (limited to 'src/InputReader.h')
-rw-r--r-- | src/InputReader.h | 62 |
1 files changed, 17 insertions, 45 deletions
diff --git a/src/InputReader.h b/src/InputReader.h index c897c2d..07326cf 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -162,65 +162,37 @@ struct zmq_input_overflow : public std::exception #if defined(HAVE_ZEROMQ) /* A ZeroMQ input. See www.zeromq.org for more info */ -struct InputZeroMQThreadData -{ - ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > *in_messages; - std::string uri; - size_t max_queued_frames; -}; - -class InputZeroMQWorker +class InputZeroMQReader : public InputReader { public: - void Start(struct InputZeroMQThreadData* workerdata); - void Stop(); - bool is_running(void) const { return running; } + InputZeroMQReader() = default; + InputZeroMQReader(const InputZeroMQReader& other) = delete; + InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; + ~InputZeroMQReader(); + + int Open(const std::string& uri, size_t max_queued_frames); + int GetNextFrame(void* buffer); + void PrintInfo() const; private: - std::atomic<bool> running = ATOMIC_VAR_INIT(false); + std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); + std::string m_uri; + size_t m_max_queued_frames = 0; + ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages; - void RecvProcess(struct InputZeroMQThreadData* workerdata); + void RecvProcess(void); - zmq::context_t zmqcontext; // is thread-safe - boost::thread recv_thread; + zmq::context_t m_zmqcontext; // is thread-safe + boost::thread m_recv_thread; /* We must be careful to keep frame phase consistent. If we * drop a single ETI frame, we will break the transmission * frame vs. ETI frame phase. * - * Here we keep track of how many ETI frames we must drop + * Here we keep track of how many ETI frames we must drop. */ int m_to_drop = 0; }; -class InputZeroMQReader : public InputReader -{ - public: - InputZeroMQReader() - { - workerdata_.in_messages = &in_messages_; - } - - ~InputZeroMQReader() - { - worker_.Stop(); - } - - int Open(const std::string& uri, size_t max_queued_frames); - - int GetNextFrame(void* buffer); - - void PrintInfo() const; - - private: - InputZeroMQReader(const InputZeroMQReader& other) = delete; - InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; - std::string uri_; - - InputZeroMQWorker worker_; - ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > in_messages_; - struct InputZeroMQThreadData workerdata_; -}; - #endif |