From 9a1f5fa40020247ed25fbe553d2ce71fa53d9e95 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 11 Apr 2018 15:52:08 +0200 Subject: Add ZMQ input buffer size to RC --- src/InputZeroMQReader.cpp | 52 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) (limited to 'src/InputZeroMQReader.cpp') diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index c7dfd84..f6c3c34 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -66,6 +66,13 @@ struct zmq_dab_message_t #define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) +InputZeroMQReader::InputZeroMQReader() : + InputReader(), + RemoteControllable("inputzmq") +{ + RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)"); +} + InputZeroMQReader::~InputZeroMQReader() { m_running = false; @@ -125,6 +132,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) return 0; } else if (incoming.size() == framesize) { + unique_lock lock(m_last_in_messages_size_mutex); + m_last_in_messages_size--; + lock.unlock(); + memcpy(buffer, &incoming.front(), framesize); } else { @@ -242,6 +253,9 @@ void InputZeroMQReader::RecvProcess() queue_size = m_in_messages.push(move(buf)); etiLog.log(trace, "ZMQ,push %zu", queue_size); + + unique_lock lock(m_last_in_messages_size_mutex); + m_last_in_messages_size++; } } } @@ -286,5 +300,43 @@ void InputZeroMQReader::RecvProcess() m_in_messages.notify(); } +// ======================================= +// Remote Control +// ======================================= +void InputZeroMQReader::set_parameter(const string& parameter, const string& value) +{ + stringstream ss(value); + ss.exceptions ( stringstream::failbit | stringstream::badbit ); + + if (parameter == "buffer") { + throw ParameterError("Parameter " + parameter + " is read-only."); + } + else { + stringstream ss_err; + ss_err << "Parameter '" << parameter + << "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss_err.str()); + } +} + +const string InputZeroMQReader::get_parameter(const string& parameter) const +{ + stringstream ss; + ss << std::fixed; + if (parameter == "buffer") { + // Do not use size of the queue, as it will contain empty + // frames to signal timeouts + unique_lock lock(m_last_in_messages_size_mutex); + const long time_in_buffer_us = 24000 * m_last_in_messages_size; + ss << time_in_buffer_us; + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); +} + #endif -- cgit v1.2.3