summaryrefslogtreecommitdiffstats
path: root/src/InputZeroMQReader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r--src/InputZeroMQReader.cpp52
1 files changed, 52 insertions, 0 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index c7dfd84..f6c3c34 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -66,6 +66,13 @@ struct zmq_dab_message_t
#define ZMQ_DAB_MESSAGE_T_HEADERSIZE \
(sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t))
+InputZeroMQReader::InputZeroMQReader() :
+ InputReader(),
+ RemoteControllable("inputzmq")
+{
+ RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)");
+}
+
InputZeroMQReader::~InputZeroMQReader()
{
m_running = false;
@@ -125,6 +132,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
return 0;
}
else if (incoming.size() == framesize) {
+ unique_lock<mutex> lock(m_last_in_messages_size_mutex);
+ m_last_in_messages_size--;
+ lock.unlock();
+
memcpy(buffer, &incoming.front(), framesize);
}
else {
@@ -242,6 +253,9 @@ void InputZeroMQReader::RecvProcess()
queue_size = m_in_messages.push(move(buf));
etiLog.log(trace, "ZMQ,push %zu", queue_size);
+
+ unique_lock<mutex> lock(m_last_in_messages_size_mutex);
+ m_last_in_messages_size++;
}
}
}
@@ -286,5 +300,43 @@ void InputZeroMQReader::RecvProcess()
m_in_messages.notify();
}
+// =======================================
+// Remote Control
+// =======================================
+void InputZeroMQReader::set_parameter(const string& parameter, const string& value)
+{
+ stringstream ss(value);
+ ss.exceptions ( stringstream::failbit | stringstream::badbit );
+
+ if (parameter == "buffer") {
+ throw ParameterError("Parameter " + parameter + " is read-only.");
+ }
+ else {
+ stringstream ss_err;
+ ss_err << "Parameter '" << parameter
+ << "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss_err.str());
+ }
+}
+
+const string InputZeroMQReader::get_parameter(const string& parameter) const
+{
+ stringstream ss;
+ ss << std::fixed;
+ if (parameter == "buffer") {
+ // Do not use size of the queue, as it will contain empty
+ // frames to signal timeouts
+ unique_lock<mutex> lock(m_last_in_messages_size_mutex);
+ const long time_in_buffer_us = 24000 * m_last_in_messages_size;
+ ss << time_in_buffer_us;
+ }
+ else {
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+ return ss.str();
+}
+
#endif