aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/DabMod.cpp5
-rw-r--r--src/InputReader.h17
-rw-r--r--src/InputZeroMQReader.cpp52
-rw-r--r--src/RemoteControl.cpp12
4 files changed, 82 insertions, 4 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index f384e78..0d4a180 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -383,6 +383,7 @@ int launch_modulator(int argc, char* argv[])
#else
auto inputZeroMQReader = make_shared<InputZeroMQReader>();
inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
+ rcs.enrol(inputZeroMQReader.get());
inputReader = inputZeroMQReader;
#endif
}
@@ -444,11 +445,13 @@ int launch_modulator(int argc, char* argv[])
}
}
#if defined(HAVE_ZEROMQ)
- else if (dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
+ else if (auto in_zmq = dynamic_pointer_cast<InputZeroMQReader>(inputReader)) {
run_again = true;
// Create a new input reader
+ rcs.remove_controllable(in_zmq.get());
auto inputZeroMQReader = make_shared<InputZeroMQReader>();
inputZeroMQReader->Open(mod_settings.inputName, mod_settings.inputMaxFramesQueued);
+ rcs.enrol(inputZeroMQReader.get());
inputReader = inputZeroMQReader;
}
#endif
diff --git a/src/InputReader.h b/src/InputReader.h
index d229417..98eab2b 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -41,6 +41,7 @@
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
# include "ThreadsafeQueue.h"
+# include "RemoteControl.h"
#endif
#include "Log.h"
#include "Socket.h"
@@ -158,10 +159,10 @@ struct zmq_input_overflow : public std::exception
#if defined(HAVE_ZEROMQ)
/* A ZeroMQ input. See www.zeromq.org for more info */
-class InputZeroMQReader : public InputReader
+class InputZeroMQReader : public InputReader, public RemoteControllable
{
public:
- InputZeroMQReader() = default;
+ InputZeroMQReader();
InputZeroMQReader(const InputZeroMQReader& other) = delete;
InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
~InputZeroMQReader();
@@ -170,12 +171,24 @@ class InputZeroMQReader : public InputReader
virtual int GetNextFrame(void* buffer) override;
virtual std::string GetPrintableInfo() const override;
+ /* Base function to set parameters. */
+ virtual void set_parameter(
+ const std::string& parameter,
+ const std::string& value) override;
+
+ /* Getting a parameter always returns a string. */
+ virtual const std::string get_parameter(
+ const std::string& parameter) const override;
+
private:
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_uri;
size_t m_max_queued_frames = 0;
ThreadsafeQueue<std::vector<uint8_t> > m_in_messages;
+ mutable std::mutex m_last_in_messages_size_mutex;
+ size_t m_last_in_messages_size = 0;
+
void RecvProcess(void);
zmq::context_t m_zmqcontext; // is thread-safe
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index c7dfd84..f6c3c34 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -66,6 +66,13 @@ struct zmq_dab_message_t
#define ZMQ_DAB_MESSAGE_T_HEADERSIZE \
(sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t))
+InputZeroMQReader::InputZeroMQReader() :
+ InputReader(),
+ RemoteControllable("inputzmq")
+{
+ RC_ADD_PARAMETER(buffer, "Size of input buffer [us] (read-only)");
+}
+
InputZeroMQReader::~InputZeroMQReader()
{
m_running = false;
@@ -125,6 +132,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
return 0;
}
else if (incoming.size() == framesize) {
+ unique_lock<mutex> lock(m_last_in_messages_size_mutex);
+ m_last_in_messages_size--;
+ lock.unlock();
+
memcpy(buffer, &incoming.front(), framesize);
}
else {
@@ -242,6 +253,9 @@ void InputZeroMQReader::RecvProcess()
queue_size = m_in_messages.push(move(buf));
etiLog.log(trace, "ZMQ,push %zu", queue_size);
+
+ unique_lock<mutex> lock(m_last_in_messages_size_mutex);
+ m_last_in_messages_size++;
}
}
}
@@ -286,5 +300,43 @@ void InputZeroMQReader::RecvProcess()
m_in_messages.notify();
}
+// =======================================
+// Remote Control
+// =======================================
+void InputZeroMQReader::set_parameter(const string& parameter, const string& value)
+{
+ stringstream ss(value);
+ ss.exceptions ( stringstream::failbit | stringstream::badbit );
+
+ if (parameter == "buffer") {
+ throw ParameterError("Parameter " + parameter + " is read-only.");
+ }
+ else {
+ stringstream ss_err;
+ ss_err << "Parameter '" << parameter
+ << "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss_err.str());
+ }
+}
+
+const string InputZeroMQReader::get_parameter(const string& parameter) const
+{
+ stringstream ss;
+ ss << std::fixed;
+ if (parameter == "buffer") {
+ // Do not use size of the queue, as it will contain empty
+ // frames to signal timeouts
+ unique_lock<mutex> lock(m_last_in_messages_size_mutex);
+ const long time_in_buffer_us = 24000 * m_last_in_messages_size;
+ ss << time_in_buffer_us;
+ }
+ else {
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+ return ss.str();
+}
+
#endif
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index 6e6a7e9..a3d9ec3 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -53,6 +53,10 @@ RemoteControllerTelnet::~RemoteControllerTelnet()
void RemoteControllerTelnet::restart()
{
+ if (m_restarter_thread.joinable()) {
+ m_restarter_thread.join();
+ }
+
m_restarter_thread = std::thread(
&RemoteControllerTelnet::restart_thread,
this, 0);
@@ -105,7 +109,9 @@ void RemoteControllerTelnet::restart_thread(long)
m_active = false;
m_io_service.stop();
- m_child_thread.join();
+ if (m_child_thread.joinable()) {
+ m_child_thread.join();
+ }
m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0);
}
@@ -342,6 +348,10 @@ RemoteControllerZmq::~RemoteControllerZmq() {
void RemoteControllerZmq::restart()
{
+ if (m_restarter_thread.joinable()) {
+ m_restarter_thread.join();
+ }
+
m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this);
}