aboutsummaryrefslogtreecommitdiffstats
path: root/src/InputReader.h
diff options
context:
space:
mode:
authorMatthias P. Braendli (think) <matthias@mpb.li>2013-11-10 21:50:12 +0100
committerMatthias P. Braendli (think) <matthias@mpb.li>2013-11-10 21:50:12 +0100
commit5d965e80be2e6ab62bc82fb2e0d4d472153ad241 (patch)
tree5add36f337b0de524b3d098f0b1fcc8d68aba0d7 /src/InputReader.h
parent4f9a01a80570437b86e69eb0542b13df9a20743d (diff)
downloaddabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.gz
dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.bz2
dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.zip
crc-dabmod: add ZeroMQ input module
Diffstat (limited to 'src/InputReader.h')
-rw-r--r--src/InputReader.h76
1 files changed, 67 insertions, 9 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index dbc7c11..8917922 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -31,6 +31,10 @@
#endif
#include <cstdio>
+#if defined(HAVE_INPUT_ZEROMQ)
+# include <zmq.hpp>
+# include "ThreadsafeQueue.h"
+#endif
#include "porting.h"
#include "Log.h"
@@ -69,8 +73,13 @@ enum EtiStreamType {
class InputReader
{
public:
- // Save the next frame into the buffer, and return the number of bytes read.
+ // Put next frame into buffer. This function will never write more than
+ // 6144 bytes into buffer.
+ // returns number of bytes written to buffer, 0 on eof, -1 on error
virtual int GetNextFrame(void* buffer) = 0;
+
+ // Print some information
+ virtual void PrintInfo() = 0;
};
class InputFileReader : public InputReader
@@ -90,18 +99,12 @@ class InputFileReader : public InputReader
}
// open file and determine stream type
- int Open(std::string filename);
+ // When loop=1, GetNextFrame will never return 0
+ int Open(std::string filename, bool loop);
// Print information about the file opened
void PrintInfo();
- // Rewind the file, and replay anew
- // returns 0 on success, -1 on failure
- int Rewind();
-
- // Put next frame into buffer. This function will never write more than
- // 6144 bytes into buffer.
- // returns number of bytes written to buffer, 0 on eof, -1 on error
int GetNextFrame(void* buffer);
EtiStreamType GetStreamType()
@@ -112,6 +115,11 @@ class InputFileReader : public InputReader
private:
int IdentifyType();
+ // Rewind the file, and replay anew
+ // returns 0 on success, -1 on failure
+ int Rewind();
+
+ bool loop_; // if shall we loop the file over and over
std::string filename_;
EtiStreamType streamtype_;
FILE* inputfile_;
@@ -122,4 +130,54 @@ class InputFileReader : public InputReader
// after 2**32 * 24ms ~= 3.3 years
};
+#if defined(HAVE_INPUT_ZEROMQ)
+/* A ZeroMQ input. See www.zeromq.org for more info */
+
+struct InputZeroMQThreadData
+{
+ ThreadsafeQueue<zmq::message_t*> *in_messages;
+ std::string uri;
+};
+
+class InputZeroMQWorker
+{
+ public:
+ InputZeroMQWorker() :
+ zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {}
+
+ void Start(struct InputZeroMQThreadData* workerdata);
+ void Stop();
+ private:
+ void RecvProcess(struct InputZeroMQThreadData* workerdata);
+ bool running;
+ zmq::context_t zmqcontext;
+ zmq::socket_t subscriber;
+ boost::thread recv_thread;
+};
+
+class InputZeroMQReader : public InputReader
+{
+ public:
+ InputZeroMQReader(Logger logger) :
+ logger_(logger), in_messages_(10)
+ {
+ workerdata_.in_messages = &in_messages_;
+ }
+
+ int Open(std::string uri);
+
+ int GetNextFrame(void* buffer);
+
+ void PrintInfo();
+
+ private:
+ Logger logger_;
+ std::string uri_;
+
+ InputZeroMQWorker worker_;
+ ThreadsafeQueue<zmq::message_t*> in_messages_;
+ struct InputZeroMQThreadData workerdata_;
+};
+
+#endif
#endif