summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-10-07 16:30:08 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-10-07 16:30:08 +0200
commit17e6a246149c11bac667a233fff1a33a1d06a1fb (patch)
tree886061498f6f8b36be2b219e07930a15147b5d7a
parentb0f2bade7a34aaff6573c81d9875d321dd889370 (diff)
downloaddabmux-17e6a246149c11bac667a233fff1a33a1d06a1fb.tar.gz
dabmux-17e6a246149c11bac667a233fff1a33a1d06a1fb.tar.bz2
dabmux-17e6a246149c11bac667a233fff1a33a1d06a1fb.zip
Add ZeroMQ RC
-rw-r--r--configure.ac1
-rw-r--r--doc/example.mux10
-rwxr-xr-xdoc/zmq_remote.py75
-rw-r--r--src/DabMux.cpp7
-rw-r--r--src/RemoteControl.cpp14
-rw-r--r--src/RemoteControl.h57
6 files changed, 101 insertions, 63 deletions
diff --git a/configure.ac b/configure.ac
index c04492c..09ca8c1 100644
--- a/configure.ac
+++ b/configure.ac
@@ -187,6 +187,7 @@ AC_CHECK_LIB(zmq, zmq_init, [] , AC_MSG_ERROR(ZeroMQ libzmq is required))
AC_DEFINE([HAVE_INPUT_ZEROMQ], [1], [Define if ZeroMQ input is enabled])
AC_DEFINE([HAVE_OUTPUT_ZEROMQ], [1], [Define if ZeroMQ output is enabled])
+AC_DEFINE([HAVE_RC_ZEROMQ], [1], [Define if ZeroMQ enabled for rc])
# Link against cURL
AM_CONDITIONAL([HAVE_CURL_TEST],
diff --git a/doc/example.mux b/doc/example.mux
index 5a65829..3f2d6f4 100644
--- a/doc/example.mux
+++ b/doc/example.mux
@@ -50,6 +50,16 @@ remotecontrol {
; Set the port to 0 to disable the server
telnetport 12721
+ ; The remote control is also accessible through a ZMQ REQ/REP socket,
+ ; and is useful for machine-triggered interactions. It supports the
+ ; same commands as the telnet RC.
+ ; The example code in doc/zmq_remote.py illustrates how to use this rc.
+ ; To disable the zeromq endpoint, remove the zmqendpoint line.
+ ; By specifying "lo" in the URL, we make the server only accessible
+ ; from localhost. You can write tcp://*:12722 to make it accessible
+ ; on all interfaces.
+ zmqendpoint tcp://lo:12722
+
; the remote control server makes use of the unique identifiers
; for the subchannels, services and components. Make sure you
; chose them so that you can identify them.
diff --git a/doc/zmq_remote.py b/doc/zmq_remote.py
new file mode 100755
index 0000000..bc9dd5d
--- /dev/null
+++ b/doc/zmq_remote.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# This is an example program that illustrates
+# how to interact with the zeromq remote control
+#
+# LICENSE: see bottom of file
+
+import sys
+import zmq
+
+context = zmq.Context()
+
+sock = context.socket(zmq.REQ)
+
+if len(sys.argv) < 2:
+ print("Usage: program url cmd [args...]")
+ sys.exit(1)
+
+sock.connect(sys.argv[1])
+
+message_parts = sys.argv[2:]
+
+# first do a ping test
+
+print("ping")
+sock.send("ping")
+data = sock.recv_multipart()
+print("Received: {}".format(len(data)))
+for i,part in enumerate(data):
+ print(" {}".format(part))
+
+for i, part in enumerate(message_parts):
+ if i == len(message_parts) - 1:
+ f = 0
+ else:
+ f = zmq.SNDMORE
+
+ print("Send {}({}): '{}'".format(i, f, part))
+
+ sock.send(part, flags=f)
+
+data = sock.recv_multipart()
+
+print("Received: {}".format(len(data)))
+for i,part in enumerate(data):
+ print(" RX {}: {}".format(i, part))
+
+
+
+# This is free and unencumbered software released into the public domain.
+#
+# Anyone is free to copy, modify, publish, use, compile, sell, or
+# distribute this software, either in source code form or as a compiled
+# binary, for any purpose, commercial or non-commercial, and by any
+# means.
+#
+# In jurisdictions that recognize copyright laws, the author or authors
+# of this software dedicate any and all copyright interest in the
+# software to the public domain. We make this dedication for the benefit
+# of the public at large and to the detriment of our heirs and
+# successors. We intend this dedication to be an overt act of
+# relinquishment in perpetuity of all present and future rights to this
+# software under copyright law.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+#
+# For more information, please refer to <http://unlicense.org>
+
+
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index cc6c327..aefa701 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -273,11 +273,14 @@ int main(int argc, char *argv[])
/************** READ REMOTE CONTROL PARAMETERS *************/
int telnetport = pt.get<int>("remotecontrol.telnetport", 0);
-
-
if (telnetport != 0) {
auto rc = std::make_shared<RemoteControllerTelnet>(telnetport);
+ rcs.add_controller(rc);
+ }
+ auto zmqendpoint = pt.get<string>("remotecontrol.zmqendpoint", "");
+ if (not zmqendpoint.empty()) {
+ auto rc = std::make_shared<RemoteControllerZmq>(zmqendpoint);
rcs.add_controller(rc);
}
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index bca0b41..12ab84e 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -120,7 +120,7 @@ void RemoteControllerTelnet::process(long)
boost::asio::streambuf buffer;
length = boost::asio::read_until( socket, buffer, "\n", ignored_error);
- std::istream str(&buffer);
+ std::istream str(&buffer);
std::getline(str, in_message);
if (length == 0) {
@@ -268,7 +268,7 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
}
-#if 0 // #if defined(HAVE_ZEROMQ)
+#if defined(HAVE_RC_ZEROMQ)
void RemoteControllerZmq::restart()
{
@@ -352,8 +352,8 @@ void RemoteControllerZmq::process()
send_ok_reply(repSocket);
}
else if (msg.size() == 1 && command == "list") {
- size_t cohort_size = m_cohort.size();
- for (auto &controllable : m_cohort) {
+ size_t cohort_size = rcs.controllables.size();
+ for (auto &controllable : rcs.controllables) {
std::stringstream ss;
ss << controllable->get_rc_name();
@@ -369,7 +369,7 @@ void RemoteControllerZmq::process()
else if (msg.size() == 2 && command == "show") {
std::string module((char*) msg[1].data(), msg[1].size());
try {
- list< vector<string> > r = get_param_list_values_(module);
+ list< vector<string> > r = rcs.get_param_list_values(module);
size_t r_size = r.size();
for (auto &param_val : r) {
std::stringstream ss;
@@ -390,7 +390,7 @@ void RemoteControllerZmq::process()
std::string parameter((char*) msg[2].data(), msg[2].size());
try {
- std::string value = get_param_(module, parameter);
+ std::string value = rcs.get_param(module, parameter);
zmq::message_t msg(value.size());
memcpy ((void*) msg.data(), value.data(), value.size());
repSocket.send(msg, 0);
@@ -405,7 +405,7 @@ void RemoteControllerZmq::process()
std::string value((char*) msg[3].data(), msg[3].size());
try {
- set_param_(module, parameter, value);
+ rcs.set_param(module, parameter, value);
send_ok_reply(repSocket);
}
catch (ParameterError &err) {
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index df99386..c682826 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -32,7 +32,7 @@
# include "config.h"
#endif
-#if defined(HAVE_ZEROMQ)
+#if defined(HAVE_RC_ZEROMQ)
# include "zmq.hpp"
#endif
@@ -254,7 +254,7 @@ class RemoteControllerTelnet : public BaseRemoteController {
int m_port;
};
-#if 0 // #if defined(HAVE_ZEROMQ)
+#if defined(HAVE_RC_ZEROMQ)
/* Implements a Remote controller using zmq transportlayer
* that listens on localhost
*/
@@ -265,7 +265,7 @@ class RemoteControllerZmq : public BaseRemoteController {
m_zmqContext(1),
m_endpoint("") { }
- RemoteControllerZmq(std::string endpoint)
+ RemoteControllerZmq(const std::string& endpoint)
: m_running(true), m_fault(false),
m_zmqContext(1),
m_endpoint(endpoint),
@@ -283,14 +283,6 @@ class RemoteControllerZmq : public BaseRemoteController {
}
}
- void enrol(RemoteControllable* controllable) {
- m_cohort.push_back(controllable);
- }
-
- void disengage(RemoteControllable* controllable) {
- m_cohort.remove(controllable);
- }
-
virtual bool fault_detected() { return m_fault; }
virtual void restart();
@@ -303,46 +295,6 @@ class RemoteControllerZmq : public BaseRemoteController {
void send_fail_reply(zmq::socket_t &pSocket, const std::string &error);
void process();
-
- RemoteControllable* get_controllable_(std::string name) {
- for (std::list<RemoteControllable*>::iterator it = m_cohort.begin();
- it != m_cohort.end(); ++it) {
- if ((*it)->get_rc_name() == name)
- {
- return *it;
- }
- }
- throw ParameterError("Module name unknown");
- }
-
- std::string get_param_(std::string name, std::string param) {
- RemoteControllable* controllable = get_controllable_(name);
- return controllable->get_parameter(param);
- }
-
- void set_param_(std::string name, std::string param, std::string value) {
- RemoteControllable* controllable = get_controllable_(name);
- return controllable->set_parameter(param, value);
- }
-
- std::list< std::vector<std::string> >
- get_param_list_values_(std::string name) {
- RemoteControllable* controllable = get_controllable_(name);
-
- std::list< std::vector<std::string> > allparams;
-
- for (auto &param : controllable->get_supported_parameters()) {
- std::vector<std::string> item;
- item.push_back(param);
- item.push_back(controllable->get_parameter(param));
-
- allparams.push_back(item);
- }
-
- return allparams;
- }
-
-
std::atomic<bool> m_running;
/* This is set to true if a fault occurred */
@@ -351,9 +303,6 @@ class RemoteControllerZmq : public BaseRemoteController {
zmq::context_t m_zmqContext;
- /* This controller commands the controllables in the cohort */
- std::list<RemoteControllable*> m_cohort;
-
std::string m_endpoint;
boost::thread m_child_thread;
};