From b0f2bade7a34aaff6573c81d9875d321dd889370 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 7 Oct 2016 16:00:38 +0200 Subject: Rework remotecontrol --- src/ConfigParser.cpp | 19 ++- src/ConfigParser.h | 6 +- src/DabMultiplexer.cpp | 27 ++-- src/DabMultiplexer.h | 2 - src/DabMux.cpp | 17 +-- src/RemoteControl.cpp | 369 ++++++++++++++++++++++++++++++++++--------------- src/RemoteControl.h | 267 +++++++++++++++++++++++------------ 7 files changed, 464 insertions(+), 243 deletions(-) diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 6a359b7..bb1e0e0 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -142,10 +142,9 @@ uint16_t get_announcement_flag_from_ptree( return flags; } -void parse_ptree(boost::property_tree::ptree& pt, - std::shared_ptr ensemble, - std::shared_ptr rc - ) +void parse_ptree( + boost::property_tree::ptree& pt, + std::shared_ptr ensemble) { using boost::property_tree::ptree; using boost::property_tree::ptree_error; @@ -247,7 +246,7 @@ void parse_ptree(boost::property_tree::ptree& pt, pt_announcement.get_child("flags")); cl->subchanneluid = pt_announcement.get("subchannel"); - cl->enrol_at(*rc); + rcs.enrol(cl.get()); ensemble->clusters.push_back(cl); } } @@ -395,7 +394,7 @@ void parse_ptree(boost::property_tree::ptree& pt, try { setup_subchannel_from_ptree(subchan, it->second, ensemble, - subchanuid, rc); + subchanuid); } catch (runtime_error &e) { etiLog.log(error, @@ -555,14 +554,12 @@ void parse_ptree(boost::property_tree::ptree& pt, ensemble->components.push_back(component); } - } void setup_subchannel_from_ptree(DabSubchannel* subchan, boost::property_tree::ptree &pt, std::shared_ptr ensemble, - string subchanuid, - std::shared_ptr rc) + string subchanuid) { using boost::property_tree::ptree; using boost::property_tree::ptree_error; @@ -657,7 +654,7 @@ void setup_subchannel_from_ptree(DabSubchannel* subchan, DabInputZmqMPEG* inzmq = new DabInputZmqMPEG(subchanuid, zmqconfig); - inzmq->enrol_at(*rc); + rcs.enrol(inzmq); subchan->input = inzmq; if (proto == "epmg") { @@ -723,7 +720,7 @@ void setup_subchannel_from_ptree(DabSubchannel* subchan, DabInputZmqAAC* inzmq = new DabInputZmqAAC(subchanuid, zmqconfig); - inzmq->enrol_at(*rc); + rcs.enrol(inzmq); subchan->input = inzmq; if (proto == "epmg") { diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 217572f..1297b90 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -39,14 +39,12 @@ #include void parse_ptree(boost::property_tree::ptree& pt, - std::shared_ptr ensemble, - std::shared_ptr rc); + std::shared_ptr ensemble); void setup_subchannel_from_ptree(DabSubchannel* subchan, boost::property_tree::ptree &pt, std::shared_ptr ensemble, - std::string subchanuid, - std::shared_ptr rc); + std::string subchanuid); #endif diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 3fe3078..90d3d02 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -67,11 +67,9 @@ const unsigned short BitRateTable[64] = { }; DabMultiplexer::DabMultiplexer( - std::shared_ptr rc, boost::property_tree::ptree pt) : RemoteControllable("mux"), m_pt(pt), - m_rc(rc), timestamp(0), MNSC_increment_time(false), sync(0x49C5F8), @@ -79,8 +77,8 @@ DabMultiplexer::DabMultiplexer( ensemble(std::make_shared()), fig_carousel(ensemble) { - RC_ADD_PARAMETER(carousel, - "Set to 1 to use the new carousel"); + RC_ADD_PARAMETER(frames, + "Show number of frames generated [read-only]"); } void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf) @@ -135,10 +133,10 @@ void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf) // Run a set of checks on the configuration void DabMultiplexer::prepare() { - parse_ptree(m_pt, ensemble, m_rc); + parse_ptree(m_pt, ensemble); - this->enrol_at(m_rc); - ensemble->enrol_at(m_rc); + rcs.enrol(this); + rcs.enrol(ensemble.get()); prepare_subchannels(); prepare_services_components(); @@ -236,7 +234,7 @@ void DabMultiplexer::prepare_services_components() throw MuxInitException(); } - service->enrol_at(m_rc); + rcs.enrol(service.get()); // Adjust components type for DAB+ while (component != ensemble->components.end()) { @@ -296,8 +294,7 @@ void DabMultiplexer::prepare_services_components() component->packet.id = cur_packetid++; - component->enrol_at(m_rc); - + rcs.enrol(component); } } @@ -796,7 +793,12 @@ void DabMultiplexer::print_info(void) void DabMultiplexer::set_parameter(const std::string& parameter, const std::string& value) { - if (0) { + if (parameter == "frames") { + stringstream ss; + ss << "Parameter '" << parameter << + "' of " << get_rc_name() << + " is read-only"; + throw ParameterError(ss.str()); } else { stringstream ss; @@ -810,7 +812,8 @@ void DabMultiplexer::set_parameter(const std::string& parameter, const std::string DabMultiplexer::get_parameter(const std::string& parameter) const { stringstream ss; - if (0) { + if (parameter == "frames") { + ss << currentFrame; } else { ss << "Parameter '" << parameter << diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index e069da5..0d008be 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -55,7 +55,6 @@ class DabMultiplexer : public RemoteControllable { public: DabMultiplexer( - std::shared_ptr rc, boost::property_tree::ptree pt); void prepare(void); @@ -80,7 +79,6 @@ class DabMultiplexer : public RemoteControllable { void prepare_data_inputs(void); boost::property_tree::ptree m_pt; - std::shared_ptr m_rc; unsigned timestamp; bool MNSC_increment_time; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 689b762..cc6c327 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -135,8 +135,6 @@ using namespace std; using boost::property_tree::ptree; using boost::property_tree::ptree_error; - - volatile sig_atomic_t running = 1; /* We are not allowed to use etiLog in the signal handler, @@ -276,16 +274,14 @@ int main(int argc, char *argv[]) /************** READ REMOTE CONTROL PARAMETERS *************/ int telnetport = pt.get("remotecontrol.telnetport", 0); - std::shared_ptr rc; if (telnetport != 0) { - rc = std::make_shared(telnetport); - } - else { - rc = std::make_shared(); + auto rc = std::make_shared(telnetport); + + rcs.add_controller(rc); } - DabMultiplexer mux(rc, pt); + DabMultiplexer mux(pt); etiLog.level(info) << PACKAGE_NAME << " " << @@ -460,9 +456,8 @@ int main(int argc, char *argv[]) } /* Check every six seconds if the remote control is still working */ - if ((currentFrame % 250 == 249) && rc->fault_detected()) { - etiLog.level(warn) << "Detected Remote Control fault, restarting it"; - rc->restart(); + if (currentFrame % 250 == 249) { + rcs.check_faults(); } /* Same for statistics server */ diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 9ecb018..bca0b41 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -27,145 +27,131 @@ #include #include #include -#include #include -#include "Log.h" #include "RemoteControl.h" using boost::asio::ip::tcp; using namespace std; -RemoteControllerTelnet::~RemoteControllerTelnet() -{ - m_running = false; - m_io_service.stop(); - m_child_thread.join(); -} +RemoteControllers rcs; void RemoteControllerTelnet::restart() { - m_restarter_thread = boost::thread(&RemoteControllerTelnet::restart_thread, + m_restarter_thread = boost::thread( + &RemoteControllerTelnet::restart_thread, this, 0); } +RemoteControllable::~RemoteControllable() { + rcs.remove_controllable(this); +} + +std::list RemoteControllable::get_supported_parameters() const { + std::list parameterlist; + for (const auto& param : m_parameters) { + parameterlist.push_back(param[0]); + } + return parameterlist; +} + +RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) { + auto rc = std::find_if(controllables.begin(), controllables.end(), + [&](RemoteControllable* r) { return r->get_rc_name() == name; }); + + if (rc == controllables.end()) { + throw ParameterError("Module name unknown"); + } + else { + return *rc; + } +} + // This runs in a separate thread, because // it would take too long to be done in the main loop // thread. void RemoteControllerTelnet::restart_thread(long) { m_running = false; - m_io_service.stop(); - m_child_thread.join(); + if (m_port) { + m_child_thread.interrupt(); + m_child_thread.join(); + } m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0); } -void RemoteControllerTelnet::handle_accept( - const boost::system::error_code& boost_error, - boost::shared_ptr< boost::asio::ip::tcp::socket > socket, - boost::asio::ip::tcp::acceptor& acceptor) +void RemoteControllerTelnet::process(long) { - - const std::string welcome = "ODR-DabMux Remote Control CLI\n" - "Write 'help' for help.\n" - "**********\n"; - const std::string prompt = "> "; + std::string m_welcome = "ODR-DabMux Remote Control CLI\n" + "Write 'help' for help.\n" + "**********\n"; + std::string m_prompt = "> "; std::string in_message; size_t length; - if (boost_error) - { - etiLog.level(error) << "RC: Error accepting connection"; - return; - } - try { - etiLog.level(info) << "RC: Accepted"; - - boost::system::error_code ignored_error; - - boost::asio::write(*socket, boost::asio::buffer(welcome), - boost::asio::transfer_all(), - ignored_error); - - while (m_running && in_message != "quit") { - boost::asio::write(*socket, boost::asio::buffer(prompt), - boost::asio::transfer_all(), - ignored_error); + boost::asio::io_service io_service; + tcp::acceptor acceptor(io_service, tcp::endpoint( + boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); + while (m_running) { in_message = ""; - boost::asio::streambuf buffer; - length = boost::asio::read_until(*socket, buffer, "\n", ignored_error); + tcp::socket socket(io_service); - std::istream str(&buffer); - std::getline(str, in_message); + acceptor.accept(socket); - if (length == 0) { - etiLog.level(info) << "RC: Connection terminated"; - break; - } + boost::system::error_code ignored_error; - while (in_message.length() > 0 && - (in_message[in_message.length()-1] == '\r' || - in_message[in_message.length()-1] == '\n')) { - in_message.erase(in_message.length()-1, 1); - } + boost::asio::write(socket, boost::asio::buffer(m_welcome), + boost::asio::transfer_all(), + ignored_error); - if (in_message.length() == 0) { - continue; - } + while (m_running && in_message != "quit") { + boost::asio::write(socket, boost::asio::buffer(m_prompt), + boost::asio::transfer_all(), + ignored_error); - etiLog.level(info) << "RC: Got message '" << in_message << "'"; + in_message = ""; - dispatch_command(*socket, in_message); - } - etiLog.level(info) << "RC: Closing socket"; - socket->close(); - } - catch (std::exception& e) - { - etiLog.level(error) << "Remote control caught exception: " << e.what(); - } -} + boost::asio::streambuf buffer; + length = boost::asio::read_until( socket, buffer, "\n", ignored_error); -void RemoteControllerTelnet::process(long) -{ - m_running = true; - - while (m_running) { - m_io_service.reset(); + std::istream str(&buffer); + std::getline(str, in_message); - tcp::acceptor acceptor(m_io_service, tcp::endpoint( - boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); + if (length == 0) { + std::cerr << "RC: Connection terminated" << std::endl; + break; + } + while (in_message.length() > 0 && + (in_message[in_message.length()-1] == '\r' || + in_message[in_message.length()-1] == '\n')) { + in_message.erase(in_message.length()-1, 1); + } - // Add a job to start accepting connections. - boost::shared_ptr socket( - new tcp::socket(acceptor.get_io_service())); + if (in_message.length() == 0) { + continue; + } - // Add an accept call to the service. This will prevent io_service::run() - // from returning. - etiLog.level(info) << "RC: Waiting for connection on port " << m_port; - acceptor.async_accept(*socket, - boost::bind(&RemoteControllerTelnet::handle_accept, - this, - boost::asio::placeholders::error, - socket, - boost::ref(acceptor))); + std::cerr << "RC: Got message '" << in_message << "'" << std::endl; - // Process event loop. - m_io_service.run(); + dispatch_command(socket, in_message); + } + std::cerr << "RC: Closing socket" << std::endl; + socket.close(); + } + } + catch (std::exception& e) { + std::cerr << "Remote control caught exception: " << e.what() << std::endl; + m_fault = true; } - - etiLog.level(info) << "RC: Leaving"; - m_fault = true; } - void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command) { vector cmd = tokenise_(command); @@ -189,16 +175,12 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman stringstream ss; if (cmd.size() == 1) { - for (list::iterator it = m_cohort.begin(); - it != m_cohort.end(); ++it) { - ss << (*it)->get_rc_name() << endl; - - list< vector >::iterator param; - list< vector > params = (*it)->get_parameter_descriptions(); - for (param = params.begin(); - param != params.end(); - ++param) { - ss << "\t" << (*param)[0] << " : " << (*param)[1] << endl; + for (auto &controllable : rcs.controllables) { + ss << controllable->get_rc_name() << endl; + + list< vector > params = controllable->get_parameter_descriptions(); + for (auto ¶m : params) { + ss << "\t" << param[0] << " : " << param[1] << endl; } } } @@ -212,10 +194,9 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman if (cmd.size() == 2) { try { stringstream ss; - list< vector > r = get_param_list_values_(cmd[1]); - for (list< vector >::iterator it = r.begin(); - it != r.end(); ++it) { - ss << (*it)[0] << ": " << (*it)[1] << endl; + list< vector > r = rcs.get_param_list_values(cmd[1]); + for (auto ¶m_val : r) { + ss << param_val[0] << ": " << param_val[1] << endl; } reply(socket, ss.str()); @@ -224,23 +205,21 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman reply(socket, e.what()); } } - else - { + else { reply(socket, "Incorrect parameters for command 'show'"); } } else if (cmd[0] == "get") { if (cmd.size() == 3) { try { - string r = get_param_(cmd[1], cmd[2]); + string r = rcs.get_param(cmd[1], cmd[2]); reply(socket, r); } catch (ParameterError &e) { reply(socket, e.what()); } } - else - { + else { reply(socket, "Incorrect parameters for command 'get'"); } } @@ -256,7 +235,7 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman } } - set_param_(cmd[1], cmd[2], new_param_value.str()); + rcs.set_param(cmd[1], cmd[2], new_param_value.str()); reply(socket, "ok"); } catch (ParameterError &e) { @@ -288,3 +267,171 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message) ignored_error); } + +#if 0 // #if defined(HAVE_ZEROMQ) + +void RemoteControllerZmq::restart() +{ + m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerZmq::restart_thread() +{ + m_running = false; + + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } + + m_child_thread = boost::thread(&RemoteControllerZmq::process, this); +} + +void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector &message) +{ + bool more = true; + do { + zmq::message_t msg; + pSocket.recv(&msg); + std::string incoming((char*)msg.data(), msg.size()); + message.push_back(incoming); + more = msg.more(); + } while (more); +} + +void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) +{ + zmq::message_t msg(2); + char repCode[2] = {'o', 'k'}; + memcpy ((void*) msg.data(), repCode, 2); + pSocket.send(msg, 0); +} + +void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) +{ + zmq::message_t msg1(4); + char repCode[4] = {'f', 'a', 'i', 'l'}; + memcpy ((void*) msg1.data(), repCode, 4); + pSocket.send(msg1, ZMQ_SNDMORE); + + zmq::message_t msg2(error.length()); + memcpy ((void*) msg2.data(), error.c_str(), error.length()); + pSocket.send(msg2, 0); +} + +void RemoteControllerZmq::process() +{ + // create zmq reply socket for receiving ctrl parameters + etiLog.level(info) << "Starting zmq remote control thread"; + try { + zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + + // connect the socket + int hwm = 100; + int linger = 0; + repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + repSocket.bind(m_endpoint.c_str()); + + // create pollitem that polls the ZMQ sockets + zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; + for (;;) { + zmq::poll(pollItems, 1, 100); + std::vector msg; + + if (pollItems[0].revents & ZMQ_POLLIN) { + recv_all(repSocket, msg); + + std::string command((char*)msg[0].data(), msg[0].size()); + + if (msg.size() == 1 && command == "ping") { + send_ok_reply(repSocket); + } + else if (msg.size() == 1 && command == "list") { + size_t cohort_size = m_cohort.size(); + for (auto &controllable : m_cohort) { + std::stringstream ss; + ss << controllable->get_rc_name(); + + std::string msg_s = ss.str(); + + zmq::message_t msg(ss.str().size()); + memcpy ((void*) msg.data(), msg_s.data(), msg_s.size()); + + int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; + repSocket.send(msg, flag); + } + } + else if (msg.size() == 2 && command == "show") { + std::string module((char*) msg[1].data(), msg[1].size()); + try { + list< vector > r = get_param_list_values_(module); + size_t r_size = r.size(); + for (auto ¶m_val : r) { + std::stringstream ss; + ss << param_val[0] << ": " << param_val[1] << endl; + zmq::message_t msg(ss.str().size()); + memcpy(msg.data(), ss.str().data(), ss.str().size()); + + int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; + repSocket.send(msg, flag); + } + } + catch (ParameterError &e) { + send_fail_reply(repSocket, e.what()); + } + } + else if (msg.size() == 3 && command == "get") { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + + try { + std::string value = get_param_(module, parameter); + zmq::message_t msg(value.size()); + memcpy ((void*) msg.data(), value.data(), value.size()); + repSocket.send(msg, 0); + } + catch (ParameterError &err) { + send_fail_reply(repSocket, err.what()); + } + } + else if (msg.size() == 4 && command == "set") { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + std::string value((char*) msg[3].data(), msg[3].size()); + + try { + set_param_(module, parameter, value); + send_ok_reply(repSocket); + } + catch (ParameterError &err) { + send_fail_reply(repSocket, err.what()); + } + } + else { + send_fail_reply(repSocket, + "Unsupported command. commands: list, show, get, set"); + } + } + + // check if thread is interrupted + boost::this_thread::interruption_point(); + } + repSocket.close(); + } + catch (boost::thread_interrupted&) {} + catch (zmq::error_t &e) { + etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); + } + catch (std::exception& e) { + etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); + m_fault = true; + } +} + +#endif + diff --git a/src/RemoteControl.h b/src/RemoteControl.h index e7bb7fe..df99386 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -3,10 +3,10 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2016 Matthias P. Braendli, matthias.braendli@mpb.li - This module adds remote-control capability to some of the dabmod modules. + This module adds remote-control capability to some of the dabmux modules. see testremotecontrol/test.cpp for an example of how to use this. */ /* @@ -26,8 +26,15 @@ along with ODR-DabMux. If not, see . */ -#ifndef _REMOTECONTROL_H -#define _REMOTECONTROL_H +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if defined(HAVE_ZEROMQ) +# include "zmq.hpp" +#endif #include #include @@ -36,14 +43,13 @@ #include #include #include -#include -#include #include #include #include #include #include +#include "Log.h" #define RC_ADD_PARAMETER(p, desc) { \ std::vector p; \ @@ -52,7 +58,6 @@ m_parameters.push_back(p); \ } - class ParameterError : public std::exception { public: @@ -71,9 +76,6 @@ class RemoteControllable; */ class BaseRemoteController { public: - /* Add a new controllable under this controller's command */ - virtual void enrol(RemoteControllable* controllable) = 0; - /* When this returns one, the remote controller cannot be * used anymore, and must be restarted by dabmux */ @@ -90,10 +92,13 @@ class BaseRemoteController { /* Objects that support remote control must implement the following class */ class RemoteControllable { public: + RemoteControllable(const std::string& name) : + m_name(name) {} - RemoteControllable(std::string name) : m_name(name) {} + RemoteControllable(const RemoteControllable& other) = delete; + RemoteControllable& operator=(const RemoteControllable& other) = delete; - virtual ~RemoteControllable() {} + virtual ~RemoteControllable(); /* return a short name used to identify the controllable. * It might be used in the commands the user has to type, so keep @@ -101,33 +106,19 @@ class RemoteControllable { */ virtual std::string get_rc_name() const { return m_name; } - /* Tell the controllable to enrol at the given controller */ - virtual void enrol_at(BaseRemoteController& controller) { - controller.enrol(this); - } - - virtual void enrol_at(std::shared_ptr controller) { - controller->enrol(this); - } - /* Return a list of possible parameters that can be set */ - virtual std::list get_supported_parameters() const { - std::list parameterlist; - for (std::list< std::vector >::const_iterator it = m_parameters.begin(); - it != m_parameters.end(); ++it) { - parameterlist.push_back((*it)[0]); - } - return parameterlist; - } + virtual std::list get_supported_parameters() const; /* Return a mapping of the descriptions of all parameters */ virtual std::list< std::vector > - get_parameter_descriptions() const { - return m_parameters; - } + get_parameter_descriptions() const + { + return m_parameters; + } /* Base function to set parameters. */ - virtual void set_parameter(const std::string& parameter, + virtual void set_parameter( + const std::string& parameter, const std::string& value) = 0; /* Getting a parameter always returns a string. */ @@ -138,30 +129,93 @@ class RemoteControllable { std::list< std::vector > m_parameters; }; +/* Holds all our remote controllers and controlled object. + */ +class RemoteControllers { + public: + void add_controller(std::shared_ptr rc) { + m_controllers.push_back(rc); + } + + void enrol(RemoteControllable *rc) { + controllables.push_back(rc); + } + + void remove_controllable(RemoteControllable *rc) { + controllables.remove(rc); + } + + void check_faults() { + for (auto &controller : m_controllers) { + if (controller->fault_detected()) + { + etiLog.level(warn) << + "Detected Remote Control fault, restarting it"; + controller->restart(); + } + } + } + + std::list< std::vector > + get_param_list_values(const std::string& name) { + RemoteControllable* controllable = get_controllable_(name); + + std::list< std::vector > allparams; + for (auto ¶m : controllable->get_supported_parameters()) { + std::vector item; + item.push_back(param); + item.push_back(controllable->get_parameter(param)); + + allparams.push_back(item); + } + return allparams; + } + + std::string get_param(const std::string& name, const std::string& param) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->get_parameter(param); + } + + void set_param(const std::string& name, const std::string& param, const std::string& value) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->set_parameter(param, value); + } + + std::list controllables; + + private: + RemoteControllable* get_controllable_(const std::string& name); + + std::list > m_controllers; +}; + +extern RemoteControllers rcs; + + /* Implements a Remote controller based on a simple telnet CLI * that listens on localhost */ class RemoteControllerTelnet : public BaseRemoteController { public: - RemoteControllerTelnet() : - m_running(false), - m_io_service(), - m_fault(false), + RemoteControllerTelnet() + : m_running(false), m_fault(false), m_port(0) { } - RemoteControllerTelnet(int port) : - m_running(false), - m_io_service(), - m_fault(false), - m_port(port) - { - restart(); - } + RemoteControllerTelnet(int port) + : m_running(true), m_fault(false), + m_child_thread(&RemoteControllerTelnet::process, this, 0), + m_port(port) { } - ~RemoteControllerTelnet(); + RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; + RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; - void enrol(RemoteControllable* controllable) { - m_cohort.push_back(controllable); + ~RemoteControllerTelnet() { + m_running = false; + m_fault = false; + if (m_port) { + m_child_thread.interrupt(); + m_child_thread.join(); + } } virtual bool fault_detected() { return m_fault; } @@ -178,14 +232,6 @@ class RemoteControllerTelnet : public BaseRemoteController { void reply(boost::asio::ip::tcp::socket& socket, std::string message); - void handle_accept( - const boost::system::error_code& boost_error, - boost::shared_ptr< boost::asio::ip::tcp::socket > socket, - boost::asio::ip::tcp::acceptor& acceptor); - - RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other); - RemoteControllerTelnet(const RemoteControllerTelnet& other); - std::vector tokenise_(std::string message) { std::vector all_tokens; @@ -197,6 +243,67 @@ class RemoteControllerTelnet : public BaseRemoteController { return all_tokens; } + std::atomic m_running; + + /* This is set to true if a fault occurred */ + std::atomic m_fault; + boost::thread m_restarter_thread; + + boost::thread m_child_thread; + + int m_port; +}; + +#if 0 // #if defined(HAVE_ZEROMQ) +/* Implements a Remote controller using zmq transportlayer + * that listens on localhost + */ +class RemoteControllerZmq : public BaseRemoteController { + public: + RemoteControllerZmq() + : m_running(false), m_fault(false), + m_zmqContext(1), + m_endpoint("") { } + + RemoteControllerZmq(std::string endpoint) + : m_running(true), m_fault(false), + m_zmqContext(1), + m_endpoint(endpoint), + m_child_thread(&RemoteControllerZmq::process, this) { } + + RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; + RemoteControllerZmq(const RemoteControllerZmq& other) = delete; + + ~RemoteControllerZmq() { + m_running = false; + m_fault = false; + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } + } + + void enrol(RemoteControllable* controllable) { + m_cohort.push_back(controllable); + } + + void disengage(RemoteControllable* controllable) { + m_cohort.remove(controllable); + } + + virtual bool fault_detected() { return m_fault; } + + virtual void restart(); + + private: + void restart_thread(); + + void recv_all(zmq::socket_t &pSocket, std::vector &message); + void send_ok_reply(zmq::socket_t &pSocket); + void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); + void process(); + + RemoteControllable* get_controllable_(std::string name) { for (std::list::iterator it = m_cohort.begin(); it != m_cohort.end(); ++it) { @@ -208,15 +315,14 @@ class RemoteControllerTelnet : public BaseRemoteController { throw ParameterError("Module name unknown"); } - std::list< std::vector > - get_parameter_descriptions_(std::string name) { + std::string get_param_(std::string name, std::string param) { RemoteControllable* controllable = get_controllable_(name); - return controllable->get_parameter_descriptions(); + return controllable->get_parameter(param); } - std::list get_param_list_(std::string name) { + void set_param_(std::string name, std::string param, std::string value) { RemoteControllable* controllable = get_controllable_(name); - return controllable->get_supported_parameters(); + return controllable->set_parameter(param, value); } std::list< std::vector > @@ -224,55 +330,32 @@ class RemoteControllerTelnet : public BaseRemoteController { RemoteControllable* controllable = get_controllable_(name); std::list< std::vector > allparams; - std::list params = controllable->get_supported_parameters(); - for (std::list::iterator it = params.begin(); - it != params.end(); ++it) { + + for (auto ¶m : controllable->get_supported_parameters()) { std::vector item; - item.push_back(*it); - item.push_back(controllable->get_parameter(*it)); + item.push_back(param); + item.push_back(controllable->get_parameter(param)); allparams.push_back(item); } + return allparams; } - std::string get_param_(std::string name, std::string param) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->get_parameter(param); - } - - void set_param_(std::string name, std::string param, std::string value) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->set_parameter(param, value); - } std::atomic m_running; - boost::asio::io_service m_io_service; - /* This is set to true if a fault occurred */ std::atomic m_fault; boost::thread m_restarter_thread; - boost::thread m_child_thread; + zmq::context_t m_zmqContext; /* This controller commands the controllables in the cohort */ std::list m_cohort; - int m_port; -}; - - -/* The Dummy remote controller does nothing, and never fails - */ -class RemoteControllerDummy : public BaseRemoteController { - public: - void enrol(RemoteControllable*) {} - - bool fault_detected() { return false; } - - virtual void restart() {} + std::string m_endpoint; + boost::thread m_child_thread; }; - #endif -- cgit v1.2.3