summaryrefslogtreecommitdiffstats
path: root/src/InputReader.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-11 15:52:08 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-11 15:52:08 +0200
commit9a1f5fa40020247ed25fbe553d2ce71fa53d9e95 (patch)
treef97a0ee97edb93c646a80b97656b927f4527ce69 /src/InputReader.h
parent7514a3951bdee4fe71f877e83fc6acf18bc847e5 (diff)
downloaddabmod-9a1f5fa40020247ed25fbe553d2ce71fa53d9e95.tar.gz
dabmod-9a1f5fa40020247ed25fbe553d2ce71fa53d9e95.tar.bz2
dabmod-9a1f5fa40020247ed25fbe553d2ce71fa53d9e95.zip
Add ZMQ input buffer size to RC
Diffstat (limited to 'src/InputReader.h')
-rw-r--r--src/InputReader.h17
1 files changed, 15 insertions, 2 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index d229417..98eab2b 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -41,6 +41,7 @@
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
# include "ThreadsafeQueue.h"
+# include "RemoteControl.h"
#endif
#include "Log.h"
#include "Socket.h"
@@ -158,10 +159,10 @@ struct zmq_input_overflow : public std::exception
#if defined(HAVE_ZEROMQ)
/* A ZeroMQ input. See www.zeromq.org for more info */
-class InputZeroMQReader : public InputReader
+class InputZeroMQReader : public InputReader, public RemoteControllable
{
public:
- InputZeroMQReader() = default;
+ InputZeroMQReader();
InputZeroMQReader(const InputZeroMQReader& other) = delete;
InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
~InputZeroMQReader();
@@ -170,12 +171,24 @@ class InputZeroMQReader : public InputReader
virtual int GetNextFrame(void* buffer) override;
virtual std::string GetPrintableInfo() const override;
+ /* Base function to set parameters. */
+ virtual void set_parameter(
+ const std::string& parameter,
+ const std::string& value) override;
+
+ /* Getting a parameter always returns a string. */
+ virtual const std::string get_parameter(
+ const std::string& parameter) const override;
+
private:
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_uri;
size_t m_max_queued_frames = 0;
ThreadsafeQueue<std::vector<uint8_t> > m_in_messages;
+ mutable std::mutex m_last_in_messages_size_mutex;
+ size_t m_last_in_messages_size = 0;
+
void RecvProcess(void);
zmq::context_t m_zmqcontext; // is thread-safe