diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-11 15:54:34 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-11 15:54:34 +0200 |
commit | 00d7d4141614ef295d8bbfeece0e9dc11152a6ae (patch) | |
tree | ca6825ef236a483404c05973974d5df5b6a34ce0 | |
parent | a1bd2965c58806dd9c2d0323b1f97deeff2f048f (diff) | |
parent | 9a1f5fa40020247ed25fbe553d2ce71fa53d9e95 (diff) | |
download | dabmod-00d7d4141614ef295d8bbfeece0e9dc11152a6ae.tar.gz dabmod-00d7d4141614ef295d8bbfeece0e9dc11152a6ae.tar.bz2 dabmod-00d7d4141614ef295d8bbfeece0e9dc11152a6ae.zip |
Merge branch 'next' into easydabv3
-rw-r--r-- | src/DabMod.cpp | 5 | ||||
-rw-r--r-- | src/InputReader.h | 17 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 52 | ||||
-rw-r--r-- | src/RemoteControl.cpp | 12 |
4 files changed, 82 insertions, 4 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp index f384e78..0d4a180 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -383,6 +383,7 @@ int launch_modulator(int argc, char* argv[]) #else auto inputZeroMQReader = make_shared<InputZeroMQReader>(); inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); inputReader = inputZeroMQReader; #endif } @@ -444,11 +445,13 @@ int launch_modulator(int argc, char* argv[]) } } #if defined(HAVE_ZEROMQ) - else if (dynamic_pointer_cast<InputZeroMQReader>(inputReader)) { + else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) { run_again = true; // Create a new input reader + rcs.remove_controllable(in_zmq.get()); auto inputZeroMQReader = make_shared<InputZeroMQReader>(); inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued); + rcs.enrol(inputZeroMQReader.get()); inputReader = inputZeroMQReader; } #endif diff --git a/src/InputReader.h b/src/InputReader.h index d229417..98eab2b 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -41,6 +41,7 @@ #if defined(HAVE_ZEROMQ) # include "zmq.hpp" # include "ThreadsafeQueue.h" +# include "RemoteControl.h" #endif #include "Log.h" #include "Socket.h" @@ -158,10 +159,10 @@ struct zmq_input_overflow : public std::exception #if defined(HAVE_ZEROMQ) /* A ZeroMQ input. See www.zeromq.org for more info */ -class InputZeroMQReader : public InputReader +class InputZeroMQReader : public InputReader, public RemoteControllable { public: - InputZeroMQReader() = default; + InputZeroMQReader(); InputZeroMQReader(const InputZeroMQReader& other) = delete; InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; ~InputZeroMQReader(); @@ -170,12 +171,24 @@ class InputZeroMQReader : public InputReader virtual int GetNextFrame(void* buffer) override; virtual std::string GetPrintableInfo() const override; + /* Base function to set parameters. */ + virtual void set_parameter( + const std::string& parameter, + const std::string& value) override; + + /* Getting a parameter always returns a string. */ + virtual const std::string get_parameter( + const std::string& parameter) const override; + private: std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); std::string m_uri; size_t m_max_queued_frames = 0; ThreadsafeQueue<std::vector<uint8_t> > m_in_messages; + mutable std::mutex m_last_in_messages_size_mutex; + size_t m_last_in_messages_size = 0; + void RecvProcess(void); zmq::context_t m_zmqcontext; // is thread-safe diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index c7dfd84..f6c3c34 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -66,6 +66,13 @@ struct zmq_dab_message_t #define ZMQ_DAB_MESSAGE_T_HEADERSIZE \ (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) +InputZeroMQReader::InputZeroMQReader() : + InputReader(), + RemoteControllable("inputzmq") +{ + RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)"); +} + InputZeroMQReader::~InputZeroMQReader() { m_running = false; @@ -125,6 +132,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer) return 0; } else if (incoming.size() == framesize) { + unique_lock<mutex> lock(m_last_in_messages_size_mutex); + m_last_in_messages_size--; + lock.unlock(); + memcpy(buffer, &incoming.front(), framesize); } else { @@ -242,6 +253,9 @@ void InputZeroMQReader::RecvProcess() queue_size = m_in_messages.push(move(buf)); etiLog.log(trace, "ZMQ,push %zu", queue_size); + + unique_lock<mutex> lock(m_last_in_messages_size_mutex); + m_last_in_messages_size++; } } } @@ -286,5 +300,43 @@ void InputZeroMQReader::RecvProcess() m_in_messages.notify(); } +// ======================================= +// Remote Control +// ======================================= +void InputZeroMQReader::set_parameter(const string& parameter, const string& value) +{ + stringstream ss(value); + ss.exceptions ( stringstream::failbit | stringstream::badbit ); + + if (parameter == "buffer") { + throw ParameterError("Parameter " + parameter + " is read-only."); + } + else { + stringstream ss_err; + ss_err << "Parameter '" << parameter + << "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss_err.str()); + } +} + +const string InputZeroMQReader::get_parameter(const string& parameter) const +{ + stringstream ss; + ss << std::fixed; + if (parameter == "buffer") { + // Do not use size of the queue, as it will contain empty + // frames to signal timeouts + unique_lock<mutex> lock(m_last_in_messages_size_mutex); + const long time_in_buffer_us = 24000 * m_last_in_messages_size; + ss << time_in_buffer_us; + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); +} + #endif diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 6e6a7e9..a3d9ec3 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -53,6 +53,10 @@ RemoteControllerTelnet::~RemoteControllerTelnet() void RemoteControllerTelnet::restart() { + if (m_restarter_thread.joinable()) { + m_restarter_thread.join(); + } + m_restarter_thread = std::thread( &RemoteControllerTelnet::restart_thread, this, 0); @@ -105,7 +109,9 @@ void RemoteControllerTelnet::restart_thread(long) m_active = false; m_io_service.stop(); - m_child_thread.join(); + if (m_child_thread.joinable()) { + m_child_thread.join(); + } m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0); } @@ -342,6 +348,10 @@ RemoteControllerZmq::~RemoteControllerZmq() { void RemoteControllerZmq::restart() { + if (m_restarter_thread.joinable()) { + m_restarter_thread.join(); + } + m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this); } |