From 17e6a246149c11bac667a233fff1a33a1d06a1fb Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 7 Oct 2016 16:30:08 +0200 Subject: Add ZeroMQ RC --- src/DabMux.cpp | 7 +++++-- src/RemoteControl.cpp | 14 ++++++------- src/RemoteControl.h | 57 +++------------------------------------------------ 3 files changed, 15 insertions(+), 63 deletions(-) (limited to 'src') diff --git a/src/DabMux.cpp b/src/DabMux.cpp index cc6c327..aefa701 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -273,11 +273,14 @@ int main(int argc, char *argv[]) /************** READ REMOTE CONTROL PARAMETERS *************/ int telnetport = pt.get("remotecontrol.telnetport", 0); - - if (telnetport != 0) { auto rc = std::make_shared(telnetport); + rcs.add_controller(rc); + } + auto zmqendpoint = pt.get("remotecontrol.zmqendpoint", ""); + if (not zmqendpoint.empty()) { + auto rc = std::make_shared(zmqendpoint); rcs.add_controller(rc); } diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index bca0b41..12ab84e 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -120,7 +120,7 @@ void RemoteControllerTelnet::process(long) boost::asio::streambuf buffer; length = boost::asio::read_until( socket, buffer, "\n", ignored_error); - std::istream str(&buffer); + std::istream str(&buffer); std::getline(str, in_message); if (length == 0) { @@ -268,7 +268,7 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message) } -#if 0 // #if defined(HAVE_ZEROMQ) +#if defined(HAVE_RC_ZEROMQ) void RemoteControllerZmq::restart() { @@ -352,8 +352,8 @@ void RemoteControllerZmq::process() send_ok_reply(repSocket); } else if (msg.size() == 1 && command == "list") { - size_t cohort_size = m_cohort.size(); - for (auto &controllable : m_cohort) { + size_t cohort_size = rcs.controllables.size(); + for (auto &controllable : rcs.controllables) { std::stringstream ss; ss << controllable->get_rc_name(); @@ -369,7 +369,7 @@ void RemoteControllerZmq::process() else if (msg.size() == 2 && command == "show") { std::string module((char*) msg[1].data(), msg[1].size()); try { - list< vector > r = get_param_list_values_(module); + list< vector > r = rcs.get_param_list_values(module); size_t r_size = r.size(); for (auto ¶m_val : r) { std::stringstream ss; @@ -390,7 +390,7 @@ void RemoteControllerZmq::process() std::string parameter((char*) msg[2].data(), msg[2].size()); try { - std::string value = get_param_(module, parameter); + std::string value = rcs.get_param(module, parameter); zmq::message_t msg(value.size()); memcpy ((void*) msg.data(), value.data(), value.size()); repSocket.send(msg, 0); @@ -405,7 +405,7 @@ void RemoteControllerZmq::process() std::string value((char*) msg[3].data(), msg[3].size()); try { - set_param_(module, parameter, value); + rcs.set_param(module, parameter, value); send_ok_reply(repSocket); } catch (ParameterError &err) { diff --git a/src/RemoteControl.h b/src/RemoteControl.h index df99386..c682826 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -32,7 +32,7 @@ # include "config.h" #endif -#if defined(HAVE_ZEROMQ) +#if defined(HAVE_RC_ZEROMQ) # include "zmq.hpp" #endif @@ -254,7 +254,7 @@ class RemoteControllerTelnet : public BaseRemoteController { int m_port; }; -#if 0 // #if defined(HAVE_ZEROMQ) +#if defined(HAVE_RC_ZEROMQ) /* Implements a Remote controller using zmq transportlayer * that listens on localhost */ @@ -265,7 +265,7 @@ class RemoteControllerZmq : public BaseRemoteController { m_zmqContext(1), m_endpoint("") { } - RemoteControllerZmq(std::string endpoint) + RemoteControllerZmq(const std::string& endpoint) : m_running(true), m_fault(false), m_zmqContext(1), m_endpoint(endpoint), @@ -283,14 +283,6 @@ class RemoteControllerZmq : public BaseRemoteController { } } - void enrol(RemoteControllable* controllable) { - m_cohort.push_back(controllable); - } - - void disengage(RemoteControllable* controllable) { - m_cohort.remove(controllable); - } - virtual bool fault_detected() { return m_fault; } virtual void restart(); @@ -303,46 +295,6 @@ class RemoteControllerZmq : public BaseRemoteController { void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); void process(); - - RemoteControllable* get_controllable_(std::string name) { - for (std::list::iterator it = m_cohort.begin(); - it != m_cohort.end(); ++it) { - if ((*it)->get_rc_name() == name) - { - return *it; - } - } - throw ParameterError("Module name unknown"); - } - - std::string get_param_(std::string name, std::string param) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->get_parameter(param); - } - - void set_param_(std::string name, std::string param, std::string value) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->set_parameter(param, value); - } - - std::list< std::vector > - get_param_list_values_(std::string name) { - RemoteControllable* controllable = get_controllable_(name); - - std::list< std::vector > allparams; - - for (auto ¶m : controllable->get_supported_parameters()) { - std::vector item; - item.push_back(param); - item.push_back(controllable->get_parameter(param)); - - allparams.push_back(item); - } - - return allparams; - } - - std::atomic m_running; /* This is set to true if a fault occurred */ @@ -351,9 +303,6 @@ class RemoteControllerZmq : public BaseRemoteController { zmq::context_t m_zmqContext; - /* This controller commands the controllables in the cohort */ - std::list m_cohort; - std::string m_endpoint; boost::thread m_child_thread; }; -- cgit v1.2.3