diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-08-21 10:11:35 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-08-21 10:11:35 +0200 | 
| commit | 86ea8cd8b8b5af7917db28ae30cfb2d2886868fe (patch) | |
| tree | 7222d8e077dd2155eecac68b8c78330bcfe5dc80 /lib/RemoteControl.cpp | |
| parent | 86fbf91f7323a2c5626a357b8414b15e20c19c9e (diff) | |
| parent | 5ee85c4ac41337e383eb1a735bc05f1e5d46a98f (diff) | |
| download | dabmux-86ea8cd8b8b5af7917db28ae30cfb2d2886868fe.tar.gz dabmux-86ea8cd8b8b5af7917db28ae30cfb2d2886868fe.tar.bz2 dabmux-86ea8cd8b8b5af7917db28ae30cfb2d2886868fe.zip | |
Merge branch 'ediInput' into next
Diffstat (limited to 'lib/RemoteControl.cpp')
| -rw-r--r-- | lib/RemoteControl.cpp | 581 | 
1 files changed, 581 insertions, 0 deletions
| diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp new file mode 100644 index 0000000..878af59 --- /dev/null +++ b/lib/RemoteControl.cpp @@ -0,0 +1,581 @@ +/* +   Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org + */ +/* +   This program is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as published by +   the Free Software Foundation, either version 3 of the License, or +   (at your option) any later version. + +   This program is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with this program.  If not, see <https://www.gnu.org/licenses/>. + */ +#include <list> +#include <string> +#include <iostream> +#include <string> +#include <algorithm> + +#include "RemoteControl.h" + +using namespace std; + +RemoteControllers rcs; + +RemoteControllerTelnet::~RemoteControllerTelnet() +{ +    m_active = false; + +    if (m_restarter_thread.joinable()) { +        m_restarter_thread.join(); +    } + +    if (m_child_thread.joinable()) { +        m_child_thread.join(); +    } +} + +void RemoteControllerTelnet::restart() +{ +    if (m_restarter_thread.joinable()) { +        m_restarter_thread.join(); +    } + +    m_restarter_thread = std::thread( +            &RemoteControllerTelnet::restart_thread, +            this, 0); +} + +RemoteControllable::~RemoteControllable() { +    rcs.remove_controllable(this); +} + +std::list<std::string> RemoteControllable::get_supported_parameters() const { +    std::list<std::string> parameterlist; +    for (const auto& param : m_parameters) { +        parameterlist.push_back(param[0]); +    } +    return parameterlist; +} + +void RemoteControllers::add_controller(std::shared_ptr<BaseRemoteController> rc) { +    m_controllers.push_back(rc); +} + +void RemoteControllers::enrol(RemoteControllable *rc) { +    controllables.push_back(rc); +} + +void RemoteControllers::remove_controllable(RemoteControllable *rc) { +    controllables.remove(rc); +} + +std::list< std::vector<std::string> > RemoteControllers::get_param_list_values(const std::string& name) { +    RemoteControllable* controllable = get_controllable_(name); + +    std::list< std::vector<std::string> > allparams; +    for (auto ¶m : controllable->get_supported_parameters()) { +        std::vector<std::string> item; +        item.push_back(param); +        try { +            item.push_back(controllable->get_parameter(param)); +        } +        catch (const ParameterError &e) { +            item.push_back(std::string("error: ") + e.what()); +        } + +        allparams.push_back(item); +    } +    return allparams; +} + +std::string RemoteControllers::get_param(const std::string& name, const std::string& param) { +    RemoteControllable* controllable = get_controllable_(name); +    return controllable->get_parameter(param); +} + +void RemoteControllers::check_faults() { +    for (auto &controller : m_controllers) { +        if (controller->fault_detected()) { +            etiLog.level(warn) << +                "Detected Remote Control fault, restarting it"; +            controller->restart(); +        } +    } +} + +RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) +{ +    auto rc = std::find_if(controllables.begin(), controllables.end(), +            [&](RemoteControllable* r) { return r->get_rc_name() == name; }); + +    if (rc == controllables.end()) { +        throw ParameterError("Module name unknown"); +    } +    else { +        return *rc; +    } +} + +void RemoteControllers::set_param( +        const std::string& name, +        const std::string& param, +        const std::string& value) +{ +    etiLog.level(info) << "RC: Setting " << name << " " << param +        << " to " << value; +    RemoteControllable* controllable = get_controllable_(name); +    try { +        return controllable->set_parameter(param, value); +    } +    catch (const ios_base::failure& e) { +        etiLog.level(info) << "RC: Failed to set " << name << " " << param +        << " to " << value << ": " << e.what(); +        throw ParameterError("Cannot understand value"); +    } +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerTelnet::restart_thread(long) +{ +    m_active = false; + +    if (m_child_thread.joinable()) { +        m_child_thread.join(); +    } + +    m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0); +} + +void RemoteControllerTelnet::handle_accept(Socket::TCPSocket&& socket) +{ +    const std::string welcome = PACKAGE_NAME " Remote Control CLI\n" +                                "Write 'help' for help.\n" +                                "**********\n"; +    const std::string prompt = "> "; + +    std::string in_message; + +    try { +        etiLog.level(info) << "RC: Accepted"; + +        socket.sendall(welcome.data(), welcome.size()); + +        while (m_active and in_message != "quit") { +            socket.sendall(prompt.data(), prompt.size()); + +            stringstream in_message_stream; + +            char last_char = '\0'; +            try { +                while (last_char != '\n') { +                    try { +                        auto ret = socket.recv(&last_char, 1, 0, 1000); +                        if (ret == 1) { +                            in_message_stream << last_char; +                        } +                        else { +                            break; +                        } +                    } +                    catch (const Socket::TCPSocket::Timeout&) { +                        if (not m_active) { +                            break; +                        } +                    } +                } +            } +            catch (const Socket::TCPSocket::Interrupted&) { +                in_message_stream.clear(); +            } + + +            if (in_message_stream.str().size() == 0) { +                etiLog.level(info) << "RC: Connection terminated"; +                break; +            } + +            std::getline(in_message_stream, in_message); + +            while (in_message.length() > 0 && +                    (in_message[in_message.length()-1] == '\r' || +                     in_message[in_message.length()-1] == '\n')) { +                in_message.erase(in_message.length()-1, 1); +            } + +            if (in_message.length() == 0) { +                continue; +            } + +            etiLog.level(info) << "RC: Got message '" << in_message << "'"; + +            dispatch_command(socket, in_message); +        } +        etiLog.level(info) << "RC: Closing socket"; +        socket.close(); +    } +    catch (const std::exception& e) { +        etiLog.level(error) << "Remote control caught exception: " << e.what(); +    } +} + +void RemoteControllerTelnet::process(long) +{ +    try { +        m_active = true; + +        m_socket.listen(m_port, "localhost"); + +        etiLog.level(info) << "RC: Waiting for connection on port " << m_port; +        while (m_active) { +            auto sock = m_socket.accept(1000); + +            if (sock.valid()) { +                handle_accept(move(sock)); +                etiLog.level(info) << "RC: Connection closed. Waiting for connection on port " << m_port; +            } +        } +    } +    catch (const runtime_error& e) { +        etiLog.level(warn) << "RC: Encountered error: " << e.what(); +    } + +    etiLog.level(info) << "RC: Leaving"; +    m_fault = true; +} + +static std::vector<std::string> tokenise(const std::string& message) { +    stringstream ss(message); +    std::vector<std::string> all_tokens; +    std::string item; + +    while (std::getline(ss, item, ' ')) { +        all_tokens.push_back(move(item)); +    } +    return all_tokens; +} + + +void RemoteControllerTelnet::dispatch_command(Socket::TCPSocket& socket, string command) +{ +    vector<string> cmd = tokenise(command); + +    if (cmd[0] == "help") { +        reply(socket, +                "The following commands are supported:\n" +                "  list\n" +                "    * Lists the modules that are loaded and their parameters\n" +                "  show MODULE\n" +                "    * Lists all parameters and their values from module MODULE\n" +                "  get MODULE PARAMETER\n" +                "    * Gets the value for the specified PARAMETER from module MODULE\n" +                "  set MODULE PARAMETER VALUE\n" +                "    * Sets the value for the PARAMETER ofr module MODULE\n" +                "  quit\n" +                "    * Terminate this session\n" +                "\n"); +    } +    else if (cmd[0] == "list") { +        stringstream ss; + +        if (cmd.size() == 1) { +            for (auto &controllable : rcs.controllables) { +                ss << controllable->get_rc_name() << endl; + +                list< vector<string> > params = controllable->get_parameter_descriptions(); +                for (auto ¶m : params) { +                    ss << "\t" << param[0] << " : " << param[1] << endl; +                } +            } +        } +        else { +            reply(socket, "Too many arguments for command 'list'"); +        } + +        reply(socket, ss.str()); +    } +    else if (cmd[0] == "show") { +        if (cmd.size() == 2) { +            try { +                stringstream ss; +                list< vector<string> > r = rcs.get_param_list_values(cmd[1]); +                for (auto ¶m_val : r) { +                    ss << param_val[0] << ": " << param_val[1] << endl; +                } +                reply(socket, ss.str()); + +            } +            catch (const ParameterError &e) { +                reply(socket, e.what()); +            } +        } +        else { +            reply(socket, "Incorrect parameters for command 'show'"); +        } +    } +    else if (cmd[0] == "get") { +        if (cmd.size() == 3) { +            try { +                string r = rcs.get_param(cmd[1], cmd[2]); +                reply(socket, r); +            } +            catch (const ParameterError &e) { +                reply(socket, e.what()); +            } +        } +        else { +            reply(socket, "Incorrect parameters for command 'get'"); +        } +    } +    else if (cmd[0] == "set") { +        if (cmd.size() >= 4) { +            try { +                stringstream new_param_value; +                for (size_t i = 3; i < cmd.size(); i++) { +                    new_param_value << cmd[i]; + +                    if (i+1 < cmd.size()) { +                        new_param_value << " "; +                    } +                } + +                rcs.set_param(cmd[1], cmd[2], new_param_value.str()); +                reply(socket, "ok"); +            } +            catch (const ParameterError &e) { +                reply(socket, e.what()); +            } +            catch (const exception &e) { +                reply(socket, "Error: Invalid parameter value. "); +            } +        } +        else { +            reply(socket, "Incorrect parameters for command 'set'"); +        } +    } +    else if (cmd[0] == "quit") { +        reply(socket, "Goodbye"); +    } +    else { +        reply(socket, "Message not understood"); +    } +} + +void RemoteControllerTelnet::reply(Socket::TCPSocket& socket, string message) +{ +    stringstream ss; +    ss << message << "\r\n"; +    socket.sendall(message.data(), message.size()); +} + + +#if defined(HAVE_ZEROMQ) + +RemoteControllerZmq::~RemoteControllerZmq() { +    m_active = false; +    m_fault = false; + +    if (m_restarter_thread.joinable()) { +        m_restarter_thread.join(); +    } + +    if (m_child_thread.joinable()) { +        m_child_thread.join(); +    } +} + +void RemoteControllerZmq::restart() +{ +    if (m_restarter_thread.joinable()) { +        m_restarter_thread.join(); +    } + +    m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerZmq::restart_thread() +{ +    m_active = false; + +    if (m_child_thread.joinable()) { +        m_child_thread.join(); +    } + +    m_child_thread = std::thread(&RemoteControllerZmq::process, this); +} + +void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message) +{ +    bool more = true; +    do { +        zmq::message_t msg; +        pSocket.recv(&msg); +        std::string incoming((char*)msg.data(), msg.size()); +        message.push_back(incoming); +        more = msg.more(); +    } while (more); +} + +void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) +{ +    zmq::message_t msg(2); +    char repCode[2] = {'o', 'k'}; +    memcpy ((void*) msg.data(), repCode, 2); +    pSocket.send(msg, 0); +} + +void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) +{ +    zmq::message_t msg1(4); +    char repCode[4] = {'f', 'a', 'i', 'l'}; +    memcpy ((void*) msg1.data(), repCode, 4); +    pSocket.send(msg1, ZMQ_SNDMORE); + +    zmq::message_t msg2(error.length()); +    memcpy ((void*) msg2.data(), error.c_str(), error.length()); +    pSocket.send(msg2, 0); +} + +void RemoteControllerZmq::process() +{ +    m_fault = false; + +    // create zmq reply socket for receiving ctrl parameters +    try { +        zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + +        // connect the socket +        int hwm = 100; +        int linger = 0; +        repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); +        repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); +        repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +        repSocket.bind(m_endpoint.c_str()); + +        // create pollitem that polls the  ZMQ sockets +        zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; +        while (m_active) { +            zmq::poll(pollItems, 1, 100); +            std::vector<std::string> msg; + +            if (pollItems[0].revents & ZMQ_POLLIN) { +                recv_all(repSocket, msg); + +                std::string command((char*)msg[0].data(), msg[0].size()); + +                if (msg.size() == 1 && command == "ping") { +                    send_ok_reply(repSocket); +                } +                else if (msg.size() == 1 && command == "list") { +                    size_t cohort_size = rcs.controllables.size(); +                    for (auto &controllable : rcs.controllables) { +                        std::stringstream ss; +                        ss << "{ \"name\": \"" << controllable->get_rc_name() << "\"," << +                            " \"params\": { "; + +                        list< vector<string> > params = controllable->get_parameter_descriptions(); +                        size_t i = 0; +                        for (auto ¶m : params) { +                            if (i > 0) { +                                ss << ", "; +                            } + +                            ss << "\"" << param[0] << "\": " << +                                "\"" << param[1] << "\""; + +                            i++; +                        } + +                        ss << " } }"; + +                        std::string msg_s = ss.str(); + +                        zmq::message_t zmsg(ss.str().size()); +                        memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); + +                        int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; +                        repSocket.send(zmsg, flag); +                    } +                } +                else if (msg.size() == 2 && command == "show") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    try { +                        list< vector<string> > r = rcs.get_param_list_values(module); +                        size_t r_size = r.size(); +                        for (auto ¶m_val : r) { +                            std::stringstream ss; +                            ss << param_val[0] << ": " << param_val[1] << endl; +                            zmq::message_t zmsg(ss.str().size()); +                            memcpy(zmsg.data(), ss.str().data(), ss.str().size()); + +                            int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; +                            repSocket.send(zmsg, flag); +                        } +                    } +                    catch (const ParameterError &err) { +                        send_fail_reply(repSocket, err.what()); +                    } +                } +                else if (msg.size() == 3 && command == "get") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    std::string parameter((char*) msg[2].data(), msg[2].size()); + +                    try { +                        std::string value = rcs.get_param(module, parameter); +                        zmq::message_t zmsg(value.size()); +                        memcpy ((void*) zmsg.data(), value.data(), value.size()); +                        repSocket.send(zmsg, 0); +                    } +                    catch (const ParameterError &err) { +                        send_fail_reply(repSocket, err.what()); +                    } +                } +                else if (msg.size() == 4 && command == "set") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    std::string parameter((char*) msg[2].data(), msg[2].size()); +                    std::string value((char*) msg[3].data(), msg[3].size()); + +                    try { +                        rcs.set_param(module, parameter, value); +                        send_ok_reply(repSocket); +                    } +                    catch (const ParameterError &err) { +                        send_fail_reply(repSocket, err.what()); +                    } +                } +                else { +                    send_fail_reply(repSocket, +                            "Unsupported command. commands: list, show, get, set"); +                } +            } +        } +        repSocket.close(); +    } +    catch (const zmq::error_t &e) { +        etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); +    } +    catch (const std::exception& e) { +        etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); +        m_fault = true; +    } +} + +#endif + | 
