summaryrefslogtreecommitdiffstats
path: root/src/InputReader.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/InputReader.h')
-rw-r--r--src/InputReader.h42
1 files changed, 29 insertions, 13 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index 3e0dcab..b262cc9 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyrigth (C) 2013
+ Copyrigth (C) 2013, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
@@ -31,6 +31,8 @@
#endif
#include <cstdio>
+#include <vector>
+#include <boost/shared_ptr.hpp>
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
# include "ThreadsafeQueue.h"
@@ -85,15 +87,15 @@ class InputReader
class InputFileReader : public InputReader
{
public:
- InputFileReader(Logger logger) :
+ InputFileReader() :
streamtype_(ETI_STREAM_TYPE_NONE),
- inputfile_(NULL), logger_(logger) {};
+ inputfile_(NULL) { }
~InputFileReader()
{
- fprintf(stderr, "\nClosing input file...\n");
-
if (inputfile_ != NULL) {
+ fprintf(stderr, "\nClosing input file...\n");
+
fclose(inputfile_);
}
}
@@ -113,6 +115,9 @@ class InputFileReader : public InputReader
}
private:
+ InputFileReader(const InputFileReader& other);
+ InputFileReader& operator=(const InputFileReader& other);
+
int IdentifyType();
// Rewind the file, and replay anew
@@ -123,20 +128,30 @@ class InputFileReader : public InputReader
std::string filename_;
EtiStreamType streamtype_;
FILE* inputfile_;
- Logger logger_;
size_t inputfilelength_;
uint64_t nbframes_; // 64-bit because 32-bit overflow is
// after 2**32 * 24ms ~= 3.3 years
};
+struct zmq_input_overflow : public std::exception
+{
+ const char* what () const throw ()
+ {
+ return "InputZMQ buffer overflow";
+ }
+};
+
#if defined(HAVE_ZEROMQ)
/* A ZeroMQ input. See www.zeromq.org for more info */
struct InputZeroMQThreadData
{
- ThreadsafeQueue<uint8_t*> *in_messages;
+ ThreadsafeQueue<boost::shared_ptr<std::vector<uint8_t> > > *in_messages;
std::string uri;
+ unsigned max_queued_frames;
+
+ bool running;
};
class InputZeroMQWorker
@@ -168,10 +183,10 @@ class InputZeroMQWorker
class InputZeroMQReader : public InputReader
{
public:
- InputZeroMQReader(Logger logger) :
- logger_(logger), in_messages_(10)
+ InputZeroMQReader()
{
workerdata_.in_messages = &in_messages_;
+ workerdata_.running = false;
}
~InputZeroMQReader()
@@ -179,21 +194,22 @@ class InputZeroMQReader : public InputReader
worker_.Stop();
}
- int Open(std::string uri);
+ int Open(const std::string& uri, unsigned max_queued_frames);
int GetNextFrame(void* buffer);
void PrintInfo();
private:
- InputZeroMQReader(const InputZeroMQReader& other) {}
- Logger logger_;
+ InputZeroMQReader(const InputZeroMQReader& other);
+ InputZeroMQReader& operator=(const InputZeroMQReader& other);
std::string uri_;
InputZeroMQWorker worker_;
- ThreadsafeQueue<uint8_t*> in_messages_;
+ ThreadsafeQueue<boost::shared_ptr<std::vector<uint8_t> > > in_messages_;
struct InputZeroMQThreadData workerdata_;
};
#endif
#endif
+