diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-11 15:52:08 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-11 15:52:08 +0200 |
commit | 9a1f5fa40020247ed25fbe553d2ce71fa53d9e95 (patch) | |
tree | f97a0ee97edb93c646a80b97656b927f4527ce69 /src/InputZeroMQReader.cpp | |
parent | 7514a3951bdee4fe71f877e83fc6acf18bc847e5 (diff) | |
download | dabmod-9a1f5fa40020247ed25fbe553d2ce71fa53d9e95.tar.gz dabmod-9a1f5fa40020247ed25fbe553d2ce71fa53d9e95.tar.bz2 dabmod-9a1f5fa40020247ed25fbe553d2ce71fa53d9e95.zip |
Add ZMQ input buffer size to RC
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index c7dfd84..f6c3c34 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -66,6 +66,13 @@ struct zmq_dab_message_t #define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) +InputZeroMQReader::InputZeroMQReader() : + InputReader(), + RemoteControllable("inputzmq") +{ + RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)"); +} + InputZeroMQReader::~InputZeroMQReader() { m_running = false; @@ -125,6 +132,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) return 0; } else if (incoming.size() == framesize) { + unique_lock<mutex> lock(m_last_in_messages_size_mutex); + m_last_in_messages_size--; + lock.unlock(); + memcpy(buffer, &incoming.front(), framesize); } else { @@ -242,6 +253,9 @@ void InputZeroMQReader::RecvProcess() queue_size = m_in_messages.push(move(buf)); etiLog.log(trace, "ZMQ,push %zu", queue_size); + + unique_lock<mutex> lock(m_last_in_messages_size_mutex); + m_last_in_messages_size++; } } } @@ -286,5 +300,43 @@ void InputZeroMQReader::RecvProcess() m_in_messages.notify(); } +// ======================================= +// Remote Control +// ======================================= +void InputZeroMQReader::set_parameter(const string& parameter, const string& value) +{ + stringstream ss(value); + ss.exceptions ( stringstream::failbit | stringstream::badbit ); + + if (parameter == "buffer") { + throw ParameterError("Parameter " + parameter + " is read-only."); + } + else { + stringstream ss_err; + ss_err << "Parameter '" << parameter + << "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss_err.str()); + } +} + +const string InputZeroMQReader::get_parameter(const string& parameter) const +{ + stringstream ss; + ss << std::fixed; + if (parameter == "buffer") { + // Do not use size of the queue, as it will contain empty + // frames to signal timeouts + unique_lock<mutex> lock(m_last_in_messages_size_mutex); + const long time_in_buffer_us = 24000 * m_last_in_messages_size; + ss << time_in_buffer_us; + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); +} + #endif |