From 7514a3951bdee4fe71f877e83fc6acf18bc847e5 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 11 Apr 2018 15:29:49 +0200 Subject: Fix thread handling in remotecontrol --- src/RemoteControl.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 6e6a7e9..a3d9ec3 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -53,6 +53,10 @@ RemoteControllerTelnet::~RemoteControllerTelnet() void RemoteControllerTelnet::restart() { + if (m_restarter_thread.joinable()) { + m_restarter_thread.join(); + } + m_restarter_thread = std::thread( &RemoteControllerTelnet::restart_thread, this, 0); @@ -105,7 +109,9 @@ void RemoteControllerTelnet::restart_thread(long) m_active = false; m_io_service.stop(); - m_child_thread.join(); + if (m_child_thread.joinable()) { + m_child_thread.join(); + } m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0); } @@ -342,6 +348,10 @@ RemoteControllerZmq::~RemoteControllerZmq() { void RemoteControllerZmq::restart() { + if (m_restarter_thread.joinable()) { + m_restarter_thread.join(); + } + m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this); } -- cgit v1.2.3 From 9a1f5fa40020247ed25fbe553d2ce71fa53d9e95 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 11 Apr 2018 15:52:08 +0200 Subject: Add ZMQ input buffer size to RC --- src/DabMod.cpp | 5 ++++- src/InputReader.h | 17 ++++++++++++++-- src/InputZeroMQReader.cpp | 52 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index f384e78..0d4a180 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -383,6 +383,7 @@ int launch_modulator(int argc, char* argv[]) #else auto inputZeroMQReader = make_shared(); inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); inputReader = inputZeroMQReader; #endif } @@ -444,11 +445,13 @@ int launch_modulator(int argc, char* argv[]) } } #if defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast(inputReader)) { + else if (auto in_zmq = dynamic_pointer_cast(inputReader)) { run_again = true; // Create a new input reader + rcs.remove_controllable(in_zmq.get()); auto inputZeroMQReader = make_shared(); inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); inputReader = inputZeroMQReader; } #endif diff --git a/src/InputReader.h b/src/InputReader.h index d229417..98eab2b 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -41,6 +41,7 @@ #if defined(HAVE_ZEROMQ) # include "zmq.hpp" # include "ThreadsafeQueue.h" +# include "RemoteControl.h" #endif #include "Log.h" #include "Socket.h" @@ -158,10 +159,10 @@ struct zmq_input_overflow : public std::exception #if defined(HAVE_ZEROMQ) /* A ZeroMQ input. See www.zeromq.org for more info */ -class InputZeroMQReader : public InputReader +class InputZeroMQReader : public InputReader, public RemoteControllable { public: - InputZeroMQReader() = default; + InputZeroMQReader(); InputZeroMQReader(const InputZeroMQReader& other) = delete; InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; ~InputZeroMQReader(); @@ -170,12 +171,24 @@ class InputZeroMQReader : public InputReader virtual int GetNextFrame(void* buffer) override; virtual std::string GetPrintableInfo() const override; + /* Base function to set parameters. */ + virtual void set_parameter( + const std::string& parameter, + const std::string& value) override; + + /* Getting a parameter always returns a string. */ + virtual const std::string get_parameter( + const std::string& parameter) const override; + private: std::atomic m_running = ATOMIC_VAR_INIT(false); std::string m_uri; size_t m_max_queued_frames = 0; ThreadsafeQueue > m_in_messages; + mutable std::mutex m_last_in_messages_size_mutex; + size_t m_last_in_messages_size = 0; + void RecvProcess(void); zmq::context_t m_zmqcontext; // is thread-safe diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index c7dfd84..f6c3c34 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -66,6 +66,13 @@ struct zmq_dab_message_t #define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) +InputZeroMQReader::InputZeroMQReader() : + InputReader(), + RemoteControllable("inputzmq") +{ + RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)"); +} + InputZeroMQReader::~InputZeroMQReader() { m_running = false; @@ -125,6 +132,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) return 0; } else if (incoming.size() == framesize) { + unique_lock lock(m_last_in_messages_size_mutex); + m_last_in_messages_size--; + lock.unlock(); + memcpy(buffer, &incoming.front(), framesize); } else { @@ -242,6 +253,9 @@ void InputZeroMQReader::RecvProcess() queue_size = m_in_messages.push(move(buf)); etiLog.log(trace, "ZMQ,push %zu", queue_size); + + unique_lock lock(m_last_in_messages_size_mutex); + m_last_in_messages_size++; } } } @@ -286,5 +300,43 @@ void InputZeroMQReader::RecvProcess() m_in_messages.notify(); } +// ======================================= +// Remote Control +// ======================================= +void InputZeroMQReader::set_parameter(const string& parameter, const string& value) +{ + stringstream ss(value); + ss.exceptions ( stringstream::failbit | stringstream::badbit ); + + if (parameter == "buffer") { + throw ParameterError("Parameter " + parameter + " is read-only."); + } + else { + stringstream ss_err; + ss_err << "Parameter '" << parameter + << "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss_err.str()); + } +} + +const string InputZeroMQReader::get_parameter(const string& parameter) const +{ + stringstream ss; + ss << std::fixed; + if (parameter == "buffer") { + // Do not use size of the queue, as it will contain empty + // frames to signal timeouts + unique_lock lock(m_last_in_messages_size_mutex); + const long time_in_buffer_us = 24000 * m_last_in_messages_size; + ss << time_in_buffer_us; + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); +} + #endif -- cgit v1.2.3