diff options
Diffstat (limited to 'src/InputReader.h')
-rw-r--r-- | src/InputReader.h | 76 |
1 files changed, 67 insertions, 9 deletions
diff --git a/src/InputReader.h b/src/InputReader.h index dbc7c11..8917922 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -31,6 +31,10 @@ #endif #include <cstdio> +#if defined(HAVE_INPUT_ZEROMQ) +# include <zmq.hpp> +# include "ThreadsafeQueue.h" +#endif #include "porting.h" #include "Log.h" @@ -69,8 +73,13 @@ enum EtiStreamType { class InputReader { public: - // Save the next frame into the buffer, and return the number of bytes read. + // Put next frame into buffer. This function will never write more than + // 6144 bytes into buffer. + // returns number of bytes written to buffer, 0 on eof, -1 on error virtual int GetNextFrame(void* buffer) = 0; + + // Print some information + virtual void PrintInfo() = 0; }; class InputFileReader : public InputReader @@ -90,18 +99,12 @@ class InputFileReader : public InputReader } // open file and determine stream type - int Open(std::string filename); + // When loop=1, GetNextFrame will never return 0 + int Open(std::string filename, bool loop); // Print information about the file opened void PrintInfo(); - // Rewind the file, and replay anew - // returns 0 on success, -1 on failure - int Rewind(); - - // Put next frame into buffer. This function will never write more than - // 6144 bytes into buffer. - // returns number of bytes written to buffer, 0 on eof, -1 on error int GetNextFrame(void* buffer); EtiStreamType GetStreamType() @@ -112,6 +115,11 @@ class InputFileReader : public InputReader private: int IdentifyType(); + // Rewind the file, and replay anew + // returns 0 on success, -1 on failure + int Rewind(); + + bool loop_; // if shall we loop the file over and over std::string filename_; EtiStreamType streamtype_; FILE* inputfile_; @@ -122,4 +130,54 @@ class InputFileReader : public InputReader // after 2**32 * 24ms ~= 3.3 years }; +#if defined(HAVE_INPUT_ZEROMQ) +/* A ZeroMQ input. See www.zeromq.org for more info */ + +struct InputZeroMQThreadData +{ + ThreadsafeQueue<zmq::message_t*> *in_messages; + std::string uri; +}; + +class InputZeroMQWorker +{ + public: + InputZeroMQWorker() : + zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {} + + void Start(struct InputZeroMQThreadData* workerdata); + void Stop(); + private: + void RecvProcess(struct InputZeroMQThreadData* workerdata); + bool running; + zmq::context_t zmqcontext; + zmq::socket_t subscriber; + boost::thread recv_thread; +}; + +class InputZeroMQReader : public InputReader +{ + public: + InputZeroMQReader(Logger logger) : + logger_(logger), in_messages_(10) + { + workerdata_.in_messages = &in_messages_; + } + + int Open(std::string uri); + + int GetNextFrame(void* buffer); + + void PrintInfo(); + + private: + Logger logger_; + std::string uri_; + + InputZeroMQWorker worker_; + ThreadsafeQueue<zmq::message_t*> in_messages_; + struct InputZeroMQThreadData workerdata_; +}; + +#endif #endif |