From b0f2bade7a34aaff6573c81d9875d321dd889370 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 7 Oct 2016 16:00:38 +0200 Subject: Rework remotecontrol --- src/RemoteControl.h | 267 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 175 insertions(+), 92 deletions(-) (limited to 'src/RemoteControl.h') diff --git a/src/RemoteControl.h b/src/RemoteControl.h index e7bb7fe..df99386 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -3,10 +3,10 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2016 Matthias P. Braendli, matthias.braendli@mpb.li - This module adds remote-control capability to some of the dabmod modules. + This module adds remote-control capability to some of the dabmux modules. see testremotecontrol/test.cpp for an example of how to use this. */ /* @@ -26,8 +26,15 @@ along with ODR-DabMux. If not, see . */ -#ifndef _REMOTECONTROL_H -#define _REMOTECONTROL_H +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if defined(HAVE_ZEROMQ) +# include "zmq.hpp" +#endif #include #include @@ -36,14 +43,13 @@ #include #include #include -#include -#include #include #include #include #include #include +#include "Log.h" #define RC_ADD_PARAMETER(p, desc) { \ std::vector p; \ @@ -52,7 +58,6 @@ m_parameters.push_back(p); \ } - class ParameterError : public std::exception { public: @@ -71,9 +76,6 @@ class RemoteControllable; */ class BaseRemoteController { public: - /* Add a new controllable under this controller's command */ - virtual void enrol(RemoteControllable* controllable) = 0; - /* When this returns one, the remote controller cannot be * used anymore, and must be restarted by dabmux */ @@ -90,10 +92,13 @@ class BaseRemoteController { /* Objects that support remote control must implement the following class */ class RemoteControllable { public: + RemoteControllable(const std::string& name) : + m_name(name) {} - RemoteControllable(std::string name) : m_name(name) {} + RemoteControllable(const RemoteControllable& other) = delete; + RemoteControllable& operator=(const RemoteControllable& other) = delete; - virtual ~RemoteControllable() {} + virtual ~RemoteControllable(); /* return a short name used to identify the controllable. * It might be used in the commands the user has to type, so keep @@ -101,33 +106,19 @@ class RemoteControllable { */ virtual std::string get_rc_name() const { return m_name; } - /* Tell the controllable to enrol at the given controller */ - virtual void enrol_at(BaseRemoteController& controller) { - controller.enrol(this); - } - - virtual void enrol_at(std::shared_ptr controller) { - controller->enrol(this); - } - /* Return a list of possible parameters that can be set */ - virtual std::list get_supported_parameters() const { - std::list parameterlist; - for (std::list< std::vector >::const_iterator it = m_parameters.begin(); - it != m_parameters.end(); ++it) { - parameterlist.push_back((*it)[0]); - } - return parameterlist; - } + virtual std::list get_supported_parameters() const; /* Return a mapping of the descriptions of all parameters */ virtual std::list< std::vector > - get_parameter_descriptions() const { - return m_parameters; - } + get_parameter_descriptions() const + { + return m_parameters; + } /* Base function to set parameters. */ - virtual void set_parameter(const std::string& parameter, + virtual void set_parameter( + const std::string& parameter, const std::string& value) = 0; /* Getting a parameter always returns a string. */ @@ -138,30 +129,93 @@ class RemoteControllable { std::list< std::vector > m_parameters; }; +/* Holds all our remote controllers and controlled object. + */ +class RemoteControllers { + public: + void add_controller(std::shared_ptr rc) { + m_controllers.push_back(rc); + } + + void enrol(RemoteControllable *rc) { + controllables.push_back(rc); + } + + void remove_controllable(RemoteControllable *rc) { + controllables.remove(rc); + } + + void check_faults() { + for (auto &controller : m_controllers) { + if (controller->fault_detected()) + { + etiLog.level(warn) << + "Detected Remote Control fault, restarting it"; + controller->restart(); + } + } + } + + std::list< std::vector > + get_param_list_values(const std::string& name) { + RemoteControllable* controllable = get_controllable_(name); + + std::list< std::vector > allparams; + for (auto ¶m : controllable->get_supported_parameters()) { + std::vector item; + item.push_back(param); + item.push_back(controllable->get_parameter(param)); + + allparams.push_back(item); + } + return allparams; + } + + std::string get_param(const std::string& name, const std::string& param) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->get_parameter(param); + } + + void set_param(const std::string& name, const std::string& param, const std::string& value) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->set_parameter(param, value); + } + + std::list controllables; + + private: + RemoteControllable* get_controllable_(const std::string& name); + + std::list > m_controllers; +}; + +extern RemoteControllers rcs; + + /* Implements a Remote controller based on a simple telnet CLI * that listens on localhost */ class RemoteControllerTelnet : public BaseRemoteController { public: - RemoteControllerTelnet() : - m_running(false), - m_io_service(), - m_fault(false), + RemoteControllerTelnet() + : m_running(false), m_fault(false), m_port(0) { } - RemoteControllerTelnet(int port) : - m_running(false), - m_io_service(), - m_fault(false), - m_port(port) - { - restart(); - } + RemoteControllerTelnet(int port) + : m_running(true), m_fault(false), + m_child_thread(&RemoteControllerTelnet::process, this, 0), + m_port(port) { } - ~RemoteControllerTelnet(); + RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; + RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; - void enrol(RemoteControllable* controllable) { - m_cohort.push_back(controllable); + ~RemoteControllerTelnet() { + m_running = false; + m_fault = false; + if (m_port) { + m_child_thread.interrupt(); + m_child_thread.join(); + } } virtual bool fault_detected() { return m_fault; } @@ -178,14 +232,6 @@ class RemoteControllerTelnet : public BaseRemoteController { void reply(boost::asio::ip::tcp::socket& socket, std::string message); - void handle_accept( - const boost::system::error_code& boost_error, - boost::shared_ptr< boost::asio::ip::tcp::socket > socket, - boost::asio::ip::tcp::acceptor& acceptor); - - RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other); - RemoteControllerTelnet(const RemoteControllerTelnet& other); - std::vector tokenise_(std::string message) { std::vector all_tokens; @@ -197,6 +243,67 @@ class RemoteControllerTelnet : public BaseRemoteController { return all_tokens; } + std::atomic m_running; + + /* This is set to true if a fault occurred */ + std::atomic m_fault; + boost::thread m_restarter_thread; + + boost::thread m_child_thread; + + int m_port; +}; + +#if 0 // #if defined(HAVE_ZEROMQ) +/* Implements a Remote controller using zmq transportlayer + * that listens on localhost + */ +class RemoteControllerZmq : public BaseRemoteController { + public: + RemoteControllerZmq() + : m_running(false), m_fault(false), + m_zmqContext(1), + m_endpoint("") { } + + RemoteControllerZmq(std::string endpoint) + : m_running(true), m_fault(false), + m_zmqContext(1), + m_endpoint(endpoint), + m_child_thread(&RemoteControllerZmq::process, this) { } + + RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; + RemoteControllerZmq(const RemoteControllerZmq& other) = delete; + + ~RemoteControllerZmq() { + m_running = false; + m_fault = false; + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } + } + + void enrol(RemoteControllable* controllable) { + m_cohort.push_back(controllable); + } + + void disengage(RemoteControllable* controllable) { + m_cohort.remove(controllable); + } + + virtual bool fault_detected() { return m_fault; } + + virtual void restart(); + + private: + void restart_thread(); + + void recv_all(zmq::socket_t &pSocket, std::vector &message); + void send_ok_reply(zmq::socket_t &pSocket); + void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); + void process(); + + RemoteControllable* get_controllable_(std::string name) { for (std::list::iterator it = m_cohort.begin(); it != m_cohort.end(); ++it) { @@ -208,15 +315,14 @@ class RemoteControllerTelnet : public BaseRemoteController { throw ParameterError("Module name unknown"); } - std::list< std::vector > - get_parameter_descriptions_(std::string name) { + std::string get_param_(std::string name, std::string param) { RemoteControllable* controllable = get_controllable_(name); - return controllable->get_parameter_descriptions(); + return controllable->get_parameter(param); } - std::list get_param_list_(std::string name) { + void set_param_(std::string name, std::string param, std::string value) { RemoteControllable* controllable = get_controllable_(name); - return controllable->get_supported_parameters(); + return controllable->set_parameter(param, value); } std::list< std::vector > @@ -224,55 +330,32 @@ class RemoteControllerTelnet : public BaseRemoteController { RemoteControllable* controllable = get_controllable_(name); std::list< std::vector > allparams; - std::list params = controllable->get_supported_parameters(); - for (std::list::iterator it = params.begin(); - it != params.end(); ++it) { + + for (auto ¶m : controllable->get_supported_parameters()) { std::vector item; - item.push_back(*it); - item.push_back(controllable->get_parameter(*it)); + item.push_back(param); + item.push_back(controllable->get_parameter(param)); allparams.push_back(item); } + return allparams; } - std::string get_param_(std::string name, std::string param) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->get_parameter(param); - } - - void set_param_(std::string name, std::string param, std::string value) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->set_parameter(param, value); - } std::atomic m_running; - boost::asio::io_service m_io_service; - /* This is set to true if a fault occurred */ std::atomic m_fault; boost::thread m_restarter_thread; - boost::thread m_child_thread; + zmq::context_t m_zmqContext; /* This controller commands the controllables in the cohort */ std::list m_cohort; - int m_port; -}; - - -/* The Dummy remote controller does nothing, and never fails - */ -class RemoteControllerDummy : public BaseRemoteController { - public: - void enrol(RemoteControllable*) {} - - bool fault_detected() { return false; } - - virtual void restart() {} + std::string m_endpoint; + boost::thread m_child_thread; }; - #endif -- cgit v1.2.3 From 17e6a246149c11bac667a233fff1a33a1d06a1fb Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 7 Oct 2016 16:30:08 +0200 Subject: Add ZeroMQ RC --- configure.ac | 1 + doc/example.mux | 10 +++++++ doc/zmq_remote.py | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/DabMux.cpp | 7 +++-- src/RemoteControl.cpp | 14 +++++----- src/RemoteControl.h | 57 +++------------------------------------ 6 files changed, 101 insertions(+), 63 deletions(-) create mode 100755 doc/zmq_remote.py (limited to 'src/RemoteControl.h') diff --git a/configure.ac b/configure.ac index c04492c..09ca8c1 100644 --- a/configure.ac +++ b/configure.ac @@ -187,6 +187,7 @@ AC_CHECK_LIB(zmq, zmq_init, [] , AC_MSG_ERROR(ZeroMQ libzmq is required)) AC_DEFINE([HAVE_INPUT_ZEROMQ], [1], [Define if ZeroMQ input is enabled]) AC_DEFINE([HAVE_OUTPUT_ZEROMQ], [1], [Define if ZeroMQ output is enabled]) +AC_DEFINE([HAVE_RC_ZEROMQ], [1], [Define if ZeroMQ enabled for rc]) # Link against cURL AM_CONDITIONAL([HAVE_CURL_TEST], diff --git a/doc/example.mux b/doc/example.mux index 5a65829..3f2d6f4 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -50,6 +50,16 @@ remotecontrol { ; Set the port to 0 to disable the server telnetport 12721 + ; The remote control is also accessible through a ZMQ REQ/REP socket, + ; and is useful for machine-triggered interactions. It supports the + ; same commands as the telnet RC. + ; The example code in doc/zmq_remote.py illustrates how to use this rc. + ; To disable the zeromq endpoint, remove the zmqendpoint line. + ; By specifying "lo" in the URL, we make the server only accessible + ; from localhost. You can write tcp://*:12722 to make it accessible + ; on all interfaces. + zmqendpoint tcp://lo:12722 + ; the remote control server makes use of the unique identifiers ; for the subchannels, services and components. Make sure you ; chose them so that you can identify them. diff --git a/doc/zmq_remote.py b/doc/zmq_remote.py new file mode 100755 index 0000000..bc9dd5d --- /dev/null +++ b/doc/zmq_remote.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# +# This is an example program that illustrates +# how to interact with the zeromq remote control +# +# LICENSE: see bottom of file + +import sys +import zmq + +context = zmq.Context() + +sock = context.socket(zmq.REQ) + +if len(sys.argv) < 2: + print("Usage: program url cmd [args...]") + sys.exit(1) + +sock.connect(sys.argv[1]) + +message_parts = sys.argv[2:] + +# first do a ping test + +print("ping") +sock.send("ping") +data = sock.recv_multipart() +print("Received: {}".format(len(data))) +for i,part in enumerate(data): + print(" {}".format(part)) + +for i, part in enumerate(message_parts): + if i == len(message_parts) - 1: + f = 0 + else: + f = zmq.SNDMORE + + print("Send {}({}): '{}'".format(i, f, part)) + + sock.send(part, flags=f) + +data = sock.recv_multipart() + +print("Received: {}".format(len(data))) +for i,part in enumerate(data): + print(" RX {}: {}".format(i, part)) + + + +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# For more information, please refer to + + diff --git a/src/DabMux.cpp b/src/DabMux.cpp index cc6c327..aefa701 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -273,11 +273,14 @@ int main(int argc, char *argv[]) /************** READ REMOTE CONTROL PARAMETERS *************/ int telnetport = pt.get("remotecontrol.telnetport", 0); - - if (telnetport != 0) { auto rc = std::make_shared(telnetport); + rcs.add_controller(rc); + } + auto zmqendpoint = pt.get("remotecontrol.zmqendpoint", ""); + if (not zmqendpoint.empty()) { + auto rc = std::make_shared(zmqendpoint); rcs.add_controller(rc); } diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index bca0b41..12ab84e 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -120,7 +120,7 @@ void RemoteControllerTelnet::process(long) boost::asio::streambuf buffer; length = boost::asio::read_until( socket, buffer, "\n", ignored_error); - std::istream str(&buffer); + std::istream str(&buffer); std::getline(str, in_message); if (length == 0) { @@ -268,7 +268,7 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message) } -#if 0 // #if defined(HAVE_ZEROMQ) +#if defined(HAVE_RC_ZEROMQ) void RemoteControllerZmq::restart() { @@ -352,8 +352,8 @@ void RemoteControllerZmq::process() send_ok_reply(repSocket); } else if (msg.size() == 1 && command == "list") { - size_t cohort_size = m_cohort.size(); - for (auto &controllable : m_cohort) { + size_t cohort_size = rcs.controllables.size(); + for (auto &controllable : rcs.controllables) { std::stringstream ss; ss << controllable->get_rc_name(); @@ -369,7 +369,7 @@ void RemoteControllerZmq::process() else if (msg.size() == 2 && command == "show") { std::string module((char*) msg[1].data(), msg[1].size()); try { - list< vector > r = get_param_list_values_(module); + list< vector > r = rcs.get_param_list_values(module); size_t r_size = r.size(); for (auto ¶m_val : r) { std::stringstream ss; @@ -390,7 +390,7 @@ void RemoteControllerZmq::process() std::string parameter((char*) msg[2].data(), msg[2].size()); try { - std::string value = get_param_(module, parameter); + std::string value = rcs.get_param(module, parameter); zmq::message_t msg(value.size()); memcpy ((void*) msg.data(), value.data(), value.size()); repSocket.send(msg, 0); @@ -405,7 +405,7 @@ void RemoteControllerZmq::process() std::string value((char*) msg[3].data(), msg[3].size()); try { - set_param_(module, parameter, value); + rcs.set_param(module, parameter, value); send_ok_reply(repSocket); } catch (ParameterError &err) { diff --git a/src/RemoteControl.h b/src/RemoteControl.h index df99386..c682826 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -32,7 +32,7 @@ # include "config.h" #endif -#if defined(HAVE_ZEROMQ) +#if defined(HAVE_RC_ZEROMQ) # include "zmq.hpp" #endif @@ -254,7 +254,7 @@ class RemoteControllerTelnet : public BaseRemoteController { int m_port; }; -#if 0 // #if defined(HAVE_ZEROMQ) +#if defined(HAVE_RC_ZEROMQ) /* Implements a Remote controller using zmq transportlayer * that listens on localhost */ @@ -265,7 +265,7 @@ class RemoteControllerZmq : public BaseRemoteController { m_zmqContext(1), m_endpoint("") { } - RemoteControllerZmq(std::string endpoint) + RemoteControllerZmq(const std::string& endpoint) : m_running(true), m_fault(false), m_zmqContext(1), m_endpoint(endpoint), @@ -283,14 +283,6 @@ class RemoteControllerZmq : public BaseRemoteController { } } - void enrol(RemoteControllable* controllable) { - m_cohort.push_back(controllable); - } - - void disengage(RemoteControllable* controllable) { - m_cohort.remove(controllable); - } - virtual bool fault_detected() { return m_fault; } virtual void restart(); @@ -303,46 +295,6 @@ class RemoteControllerZmq : public BaseRemoteController { void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); void process(); - - RemoteControllable* get_controllable_(std::string name) { - for (std::list::iterator it = m_cohort.begin(); - it != m_cohort.end(); ++it) { - if ((*it)->get_rc_name() == name) - { - return *it; - } - } - throw ParameterError("Module name unknown"); - } - - std::string get_param_(std::string name, std::string param) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->get_parameter(param); - } - - void set_param_(std::string name, std::string param, std::string value) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->set_parameter(param, value); - } - - std::list< std::vector > - get_param_list_values_(std::string name) { - RemoteControllable* controllable = get_controllable_(name); - - std::list< std::vector > allparams; - - for (auto ¶m : controllable->get_supported_parameters()) { - std::vector item; - item.push_back(param); - item.push_back(controllable->get_parameter(param)); - - allparams.push_back(item); - } - - return allparams; - } - - std::atomic m_running; /* This is set to true if a fault occurred */ @@ -351,9 +303,6 @@ class RemoteControllerZmq : public BaseRemoteController { zmq::context_t m_zmqContext; - /* This controller commands the controllables in the cohort */ - std::list m_cohort; - std::string m_endpoint; boost::thread m_child_thread; }; -- cgit v1.2.3