diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-10-07 16:00:38 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-10-07 16:00:38 +0200 | 
| commit | b0f2bade7a34aaff6573c81d9875d321dd889370 (patch) | |
| tree | 1230f87201b707a8822505ba3688c233b3c04a19 | |
| parent | b7ad6113a9f7373d1446c553daa24a8e0a0b3dad (diff) | |
| download | dabmux-b0f2bade7a34aaff6573c81d9875d321dd889370.tar.gz dabmux-b0f2bade7a34aaff6573c81d9875d321dd889370.tar.bz2 dabmux-b0f2bade7a34aaff6573c81d9875d321dd889370.zip | |
Rework remotecontrol
| -rw-r--r-- | src/ConfigParser.cpp | 19 | ||||
| -rw-r--r-- | src/ConfigParser.h | 6 | ||||
| -rw-r--r-- | src/DabMultiplexer.cpp | 27 | ||||
| -rw-r--r-- | src/DabMultiplexer.h | 2 | ||||
| -rw-r--r-- | src/DabMux.cpp | 17 | ||||
| -rw-r--r-- | src/RemoteControl.cpp | 369 | ||||
| -rw-r--r-- | src/RemoteControl.h | 267 | 
7 files changed, 464 insertions, 243 deletions
| diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 6a359b7..bb1e0e0 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -142,10 +142,9 @@ uint16_t get_announcement_flag_from_ptree(      return flags;  } -void parse_ptree(boost::property_tree::ptree& pt, -        std::shared_ptr<dabEnsemble> ensemble, -        std::shared_ptr<BaseRemoteController> rc -        ) +void parse_ptree( +        boost::property_tree::ptree& pt, +        std::shared_ptr<dabEnsemble> ensemble)  {      using boost::property_tree::ptree;      using boost::property_tree::ptree_error; @@ -247,7 +246,7 @@ void parse_ptree(boost::property_tree::ptree& pt,                      pt_announcement.get_child("flags"));              cl->subchanneluid = pt_announcement.get<string>("subchannel"); -            cl->enrol_at(*rc); +            rcs.enrol(cl.get());              ensemble->clusters.push_back(cl);          }      } @@ -395,7 +394,7 @@ void parse_ptree(boost::property_tree::ptree& pt,          try {              setup_subchannel_from_ptree(subchan, it->second, ensemble, -                    subchanuid, rc); +                    subchanuid);          }          catch (runtime_error &e) {              etiLog.log(error, @@ -555,14 +554,12 @@ void parse_ptree(boost::property_tree::ptree& pt,          ensemble->components.push_back(component);      } -  }  void setup_subchannel_from_ptree(DabSubchannel* subchan,          boost::property_tree::ptree &pt,          std::shared_ptr<dabEnsemble> ensemble, -        string subchanuid, -        std::shared_ptr<BaseRemoteController> rc) +        string subchanuid)  {      using boost::property_tree::ptree;      using boost::property_tree::ptree_error; @@ -657,7 +654,7 @@ void setup_subchannel_from_ptree(DabSubchannel* subchan,              DabInputZmqMPEG* inzmq =                  new DabInputZmqMPEG(subchanuid, zmqconfig); -            inzmq->enrol_at(*rc); +            rcs.enrol(inzmq);              subchan->input     = inzmq;              if (proto == "epmg") { @@ -723,7 +720,7 @@ void setup_subchannel_from_ptree(DabSubchannel* subchan,              DabInputZmqAAC* inzmq =                  new DabInputZmqAAC(subchanuid, zmqconfig); -            inzmq->enrol_at(*rc); +            rcs.enrol(inzmq);              subchan->input     = inzmq;              if (proto == "epmg") { diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 217572f..1297b90 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -39,14 +39,12 @@  #include <memory>  void parse_ptree(boost::property_tree::ptree& pt, -        std::shared_ptr<dabEnsemble> ensemble, -        std::shared_ptr<BaseRemoteController> rc); +        std::shared_ptr<dabEnsemble> ensemble);  void setup_subchannel_from_ptree(DabSubchannel* subchan,          boost::property_tree::ptree &pt,          std::shared_ptr<dabEnsemble> ensemble, -        std::string subchanuid, -        std::shared_ptr<BaseRemoteController> rc); +        std::string subchanuid);  #endif diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 3fe3078..90d3d02 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -67,11 +67,9 @@ const unsigned short BitRateTable[64] = {  };  DabMultiplexer::DabMultiplexer( -        std::shared_ptr<BaseRemoteController> rc,          boost::property_tree::ptree pt) :      RemoteControllable("mux"),      m_pt(pt), -    m_rc(rc),      timestamp(0),      MNSC_increment_time(false),      sync(0x49C5F8), @@ -79,8 +77,8 @@ DabMultiplexer::DabMultiplexer(      ensemble(std::make_shared<dabEnsemble>()),      fig_carousel(ensemble)  { -    RC_ADD_PARAMETER(carousel, -            "Set to 1 to use the new carousel"); +    RC_ADD_PARAMETER(frames, +            "Show number of frames generated [read-only]");  }  void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf) @@ -135,10 +133,10 @@ void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf)  // Run a set of checks on the configuration  void DabMultiplexer::prepare()  { -    parse_ptree(m_pt, ensemble, m_rc); +    parse_ptree(m_pt, ensemble); -    this->enrol_at(m_rc); -    ensemble->enrol_at(m_rc); +    rcs.enrol(this); +    rcs.enrol(ensemble.get());      prepare_subchannels();      prepare_services_components(); @@ -236,7 +234,7 @@ void DabMultiplexer::prepare_services_components()              throw MuxInitException();          } -        service->enrol_at(m_rc); +        rcs.enrol(service.get());          // Adjust components type for DAB+          while (component != ensemble->components.end()) { @@ -296,8 +294,7 @@ void DabMultiplexer::prepare_services_components()          component->packet.id = cur_packetid++; -        component->enrol_at(m_rc); - +        rcs.enrol(component);      }  } @@ -796,7 +793,12 @@ void DabMultiplexer::print_info(void)  void DabMultiplexer::set_parameter(const std::string& parameter,                 const std::string& value)  { -    if (0) { +    if (parameter == "frames") { +        stringstream ss; +        ss << "Parameter '" << parameter << +            "' of " << get_rc_name() << +            " is read-only"; +        throw ParameterError(ss.str());      }      else {          stringstream ss; @@ -810,7 +812,8 @@ void DabMultiplexer::set_parameter(const std::string& parameter,  const std::string DabMultiplexer::get_parameter(const std::string& parameter) const  {      stringstream ss; -    if (0) { +    if (parameter == "frames") { +        ss << currentFrame;      }      else {          ss << "Parameter '" << parameter << diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index e069da5..0d008be 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -55,7 +55,6 @@  class DabMultiplexer : public RemoteControllable {      public:          DabMultiplexer( -                std::shared_ptr<BaseRemoteController> rc,                  boost::property_tree::ptree pt);          void prepare(void); @@ -80,7 +79,6 @@ class DabMultiplexer : public RemoteControllable {          void prepare_data_inputs(void);          boost::property_tree::ptree m_pt; -        std::shared_ptr<BaseRemoteController> m_rc;          unsigned timestamp;          bool MNSC_increment_time; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 689b762..cc6c327 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -135,8 +135,6 @@ using namespace std;  using boost::property_tree::ptree;  using boost::property_tree::ptree_error; - -  volatile sig_atomic_t running = 1;  /* We are not allowed to use etiLog in the signal handler, @@ -276,16 +274,14 @@ int main(int argc, char *argv[])          /************** READ REMOTE CONTROL PARAMETERS *************/          int telnetport = pt.get<int>("remotecontrol.telnetport", 0); -        std::shared_ptr<BaseRemoteController> rc;          if (telnetport != 0) { -            rc = std::make_shared<RemoteControllerTelnet>(telnetport); -        } -        else { -            rc = std::make_shared<RemoteControllerDummy>(); +            auto rc = std::make_shared<RemoteControllerTelnet>(telnetport); + +            rcs.add_controller(rc);          } -        DabMultiplexer mux(rc, pt); +        DabMultiplexer mux(pt);          etiLog.level(info) <<                  PACKAGE_NAME << " " << @@ -460,9 +456,8 @@ int main(int argc, char *argv[])              }              /* Check every six seconds if the remote control is still working */ -            if ((currentFrame % 250 == 249) && rc->fault_detected()) { -                etiLog.level(warn) << "Detected Remote Control fault, restarting it"; -                rc->restart(); +            if (currentFrame % 250 == 249) { +                rcs.check_faults();              }              /* Same for statistics server */ diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 9ecb018..bca0b41 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -27,145 +27,131 @@  #include <iostream>  #include <string>  #include <boost/asio.hpp> -#include <boost/bind.hpp>  #include <boost/thread.hpp> -#include "Log.h"  #include "RemoteControl.h"  using boost::asio::ip::tcp;  using namespace std; -RemoteControllerTelnet::~RemoteControllerTelnet() -{ -    m_running = false; -    m_io_service.stop(); -    m_child_thread.join(); -} +RemoteControllers rcs;  void RemoteControllerTelnet::restart()  { -    m_restarter_thread = boost::thread(&RemoteControllerTelnet::restart_thread, +    m_restarter_thread = boost::thread( +            &RemoteControllerTelnet::restart_thread,              this, 0);  } +RemoteControllable::~RemoteControllable() { +    rcs.remove_controllable(this); +} + +std::list<std::string> RemoteControllable::get_supported_parameters() const { +    std::list<std::string> parameterlist; +    for (const auto& param : m_parameters) { +        parameterlist.push_back(param[0]); +    } +    return parameterlist; +} + +RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) { +    auto rc = std::find_if(controllables.begin(), controllables.end(), +            [&](RemoteControllable* r) { return r->get_rc_name() == name; }); + +    if (rc == controllables.end()) { +        throw ParameterError("Module name unknown"); +    } +    else { +        return *rc; +    } +} +  // This runs in a separate thread, because  // it would take too long to be done in the main loop  // thread.  void RemoteControllerTelnet::restart_thread(long)  {      m_running = false; -    m_io_service.stop(); -    m_child_thread.join(); +    if (m_port) { +        m_child_thread.interrupt(); +        m_child_thread.join(); +    }      m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0);  } -void RemoteControllerTelnet::handle_accept( -        const boost::system::error_code& boost_error, -        boost::shared_ptr< boost::asio::ip::tcp::socket > socket, -        boost::asio::ip::tcp::acceptor& acceptor) +void RemoteControllerTelnet::process(long)  { - -    const std::string welcome = "ODR-DabMux Remote Control CLI\n" -                                "Write 'help' for help.\n" -                                "**********\n"; -    const std::string prompt = "> "; +    std::string m_welcome = "ODR-DabMux Remote Control CLI\n" +                            "Write 'help' for help.\n" +                            "**********\n"; +    std::string m_prompt = "> ";      std::string in_message;      size_t length; -    if (boost_error) -    { -        etiLog.level(error) << "RC: Error accepting connection"; -        return; -    } -      try { -        etiLog.level(info) << "RC: Accepted"; - -        boost::system::error_code ignored_error; - -        boost::asio::write(*socket, boost::asio::buffer(welcome), -                boost::asio::transfer_all(), -                ignored_error); - -        while (m_running && in_message != "quit") { -            boost::asio::write(*socket, boost::asio::buffer(prompt), -                    boost::asio::transfer_all(), -                    ignored_error); +        boost::asio::io_service io_service; +        tcp::acceptor acceptor(io_service, tcp::endpoint( +                    boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); +        while (m_running) {              in_message = ""; -            boost::asio::streambuf buffer; -            length = boost::asio::read_until(*socket, buffer, "\n", ignored_error); +            tcp::socket socket(io_service); -            std::istream str(&buffer); -            std::getline(str, in_message); +            acceptor.accept(socket); -            if (length == 0) { -                etiLog.level(info) << "RC: Connection terminated"; -                break; -            } +            boost::system::error_code ignored_error; -            while (in_message.length() > 0 && -                    (in_message[in_message.length()-1] == '\r' || -                     in_message[in_message.length()-1] == '\n')) { -                in_message.erase(in_message.length()-1, 1); -            } +            boost::asio::write(socket, boost::asio::buffer(m_welcome), +                    boost::asio::transfer_all(), +                    ignored_error); -            if (in_message.length() == 0) { -                continue; -            } +            while (m_running && in_message != "quit") { +                boost::asio::write(socket, boost::asio::buffer(m_prompt), +                        boost::asio::transfer_all(), +                        ignored_error); -            etiLog.level(info) << "RC: Got message '" << in_message << "'"; +                in_message = ""; -            dispatch_command(*socket, in_message); -        } -        etiLog.level(info) << "RC: Closing socket"; -        socket->close(); -    } -    catch (std::exception& e) -    { -        etiLog.level(error) << "Remote control caught exception: " << e.what(); -    } -} +                boost::asio::streambuf buffer; +                length = boost::asio::read_until( socket, buffer, "\n", ignored_error); -void RemoteControllerTelnet::process(long) -{ -    m_running = true; - -    while (m_running) { -        m_io_service.reset(); +                std::istream str(&buffer);  +                std::getline(str, in_message); -        tcp::acceptor acceptor(m_io_service, tcp::endpoint( -                    boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); +                if (length == 0) { +                    std::cerr << "RC: Connection terminated" << std::endl; +                    break; +                } +                while (in_message.length() > 0 && +                        (in_message[in_message.length()-1] == '\r' || +                         in_message[in_message.length()-1] == '\n')) { +                    in_message.erase(in_message.length()-1, 1); +                } -        // Add a job to start accepting connections. -        boost::shared_ptr<tcp::socket> socket( -                new tcp::socket(acceptor.get_io_service())); +                if (in_message.length() == 0) { +                    continue; +                } -        // Add an accept call to the service.  This will prevent io_service::run() -        // from returning. -        etiLog.level(info) << "RC: Waiting for connection on port " << m_port; -        acceptor.async_accept(*socket, -                boost::bind(&RemoteControllerTelnet::handle_accept, -                    this, -                    boost::asio::placeholders::error, -                    socket, -                    boost::ref(acceptor))); +                std::cerr << "RC: Got message '" << in_message << "'" << std::endl; -        // Process event loop. -        m_io_service.run(); +                dispatch_command(socket, in_message); +            } +            std::cerr << "RC: Closing socket" << std::endl; +            socket.close(); +        } +    } +    catch (std::exception& e) { +        std::cerr << "Remote control caught exception: " << e.what() << std::endl; +        m_fault = true;      } - -    etiLog.level(info) << "RC: Leaving"; -    m_fault = true;  } -  void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)  {      vector<string> cmd = tokenise_(command); @@ -189,16 +175,12 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman          stringstream ss;          if (cmd.size() == 1) { -            for (list<RemoteControllable*>::iterator it = m_cohort.begin(); -                    it != m_cohort.end(); ++it) { -                ss << (*it)->get_rc_name() << endl; - -                list< vector<string> >::iterator param; -                list< vector<string> > params = (*it)->get_parameter_descriptions(); -                for (param = params.begin(); -                        param != params.end(); -                        ++param) { -                    ss << "\t" << (*param)[0] << " : " << (*param)[1] << endl; +            for (auto &controllable : rcs.controllables) { +                ss << controllable->get_rc_name() << endl; + +                list< vector<string> > params = controllable->get_parameter_descriptions(); +                for (auto ¶m : params) { +                    ss << "\t" << param[0] << " : " << param[1] << endl;                  }              }          } @@ -212,10 +194,9 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman          if (cmd.size() == 2) {              try {                  stringstream ss; -                list< vector<string> > r = get_param_list_values_(cmd[1]); -                for (list< vector<string> >::iterator it = r.begin(); -                        it != r.end(); ++it) { -                    ss << (*it)[0] << ": " << (*it)[1] << endl; +                list< vector<string> > r = rcs.get_param_list_values(cmd[1]); +                for (auto ¶m_val : r) { +                    ss << param_val[0] << ": " << param_val[1] << endl;                  }                  reply(socket, ss.str()); @@ -224,23 +205,21 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman                  reply(socket, e.what());              }          } -        else -        { +        else {              reply(socket, "Incorrect parameters for command 'show'");          }      }      else if (cmd[0] == "get") {          if (cmd.size() == 3) {              try { -                string r = get_param_(cmd[1], cmd[2]); +                string r = rcs.get_param(cmd[1], cmd[2]);                  reply(socket, r);              }              catch (ParameterError &e) {                  reply(socket, e.what());              }          } -        else -        { +        else {              reply(socket, "Incorrect parameters for command 'get'");          }      } @@ -256,7 +235,7 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman                      }                  } -                set_param_(cmd[1], cmd[2], new_param_value.str()); +                rcs.set_param(cmd[1], cmd[2], new_param_value.str());                  reply(socket, "ok");              }              catch (ParameterError &e) { @@ -288,3 +267,171 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message)              ignored_error);  } + +#if 0 // #if defined(HAVE_ZEROMQ) + +void RemoteControllerZmq::restart() +{ +    m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); +} + +// This runs in a separate thread, because +// it would take too long to be done in the main loop +// thread. +void RemoteControllerZmq::restart_thread() +{ +    m_running = false; + +    if (!m_endpoint.empty()) { +        m_child_thread.interrupt(); +        m_child_thread.join(); +    } + +    m_child_thread = boost::thread(&RemoteControllerZmq::process, this); +} + +void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message) +{ +    bool more = true; +    do { +        zmq::message_t msg; +        pSocket.recv(&msg); +        std::string incoming((char*)msg.data(), msg.size()); +        message.push_back(incoming); +        more = msg.more(); +    } while (more); +} + +void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) +{ +    zmq::message_t msg(2); +    char repCode[2] = {'o', 'k'}; +    memcpy ((void*) msg.data(), repCode, 2); +    pSocket.send(msg, 0); +} + +void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) +{ +    zmq::message_t msg1(4); +    char repCode[4] = {'f', 'a', 'i', 'l'}; +    memcpy ((void*) msg1.data(), repCode, 4); +    pSocket.send(msg1, ZMQ_SNDMORE); + +    zmq::message_t msg2(error.length()); +    memcpy ((void*) msg2.data(), error.c_str(), error.length()); +    pSocket.send(msg2, 0); +} + +void RemoteControllerZmq::process() +{ +    // create zmq reply socket for receiving ctrl parameters +    etiLog.level(info) << "Starting zmq remote control thread"; +    try { +        zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + +        // connect the socket +        int hwm = 100; +        int linger = 0; +        repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); +        repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); +        repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +        repSocket.bind(m_endpoint.c_str()); + +        // create pollitem that polls the  ZMQ sockets +        zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; +        for (;;) { +            zmq::poll(pollItems, 1, 100); +            std::vector<std::string> msg; + +            if (pollItems[0].revents & ZMQ_POLLIN) { +                recv_all(repSocket, msg); + +                std::string command((char*)msg[0].data(), msg[0].size()); + +                if (msg.size() == 1 && command == "ping") { +                    send_ok_reply(repSocket); +                } +                else if (msg.size() == 1 && command == "list") { +                    size_t cohort_size = m_cohort.size(); +                    for (auto &controllable : m_cohort) { +                        std::stringstream ss; +                        ss << controllable->get_rc_name(); + +                        std::string msg_s = ss.str(); + +                        zmq::message_t msg(ss.str().size()); +                        memcpy ((void*) msg.data(), msg_s.data(), msg_s.size()); + +                        int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; +                        repSocket.send(msg, flag); +                    } +                } +                else if (msg.size() == 2 && command == "show") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    try { +                        list< vector<string> > r = get_param_list_values_(module); +                        size_t r_size = r.size(); +                        for (auto ¶m_val : r) { +                            std::stringstream ss; +                            ss << param_val[0] << ": " << param_val[1] << endl; +                            zmq::message_t msg(ss.str().size()); +                            memcpy(msg.data(), ss.str().data(), ss.str().size()); + +                            int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; +                            repSocket.send(msg, flag); +                        } +                    } +                    catch (ParameterError &e) { +                        send_fail_reply(repSocket, e.what()); +                    } +                } +                else if (msg.size() == 3 && command == "get") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    std::string parameter((char*) msg[2].data(), msg[2].size()); + +                    try { +                        std::string value = get_param_(module, parameter); +                        zmq::message_t msg(value.size()); +                        memcpy ((void*) msg.data(), value.data(), value.size()); +                        repSocket.send(msg, 0); +                    } +                    catch (ParameterError &err) { +                        send_fail_reply(repSocket, err.what()); +                    } +                } +                else if (msg.size() == 4 && command == "set") { +                    std::string module((char*) msg[1].data(), msg[1].size()); +                    std::string parameter((char*) msg[2].data(), msg[2].size()); +                    std::string value((char*) msg[3].data(), msg[3].size()); + +                    try { +                        set_param_(module, parameter, value); +                        send_ok_reply(repSocket); +                    } +                    catch (ParameterError &err) { +                        send_fail_reply(repSocket, err.what()); +                    } +                } +                else { +                    send_fail_reply(repSocket, +                            "Unsupported command. commands: list, show, get, set"); +                } +            } + +            // check if thread is interrupted +            boost::this_thread::interruption_point(); +        } +        repSocket.close(); +    } +    catch (boost::thread_interrupted&) {} +    catch (zmq::error_t &e) { +        etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); +    } +    catch (std::exception& e) { +        etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); +        m_fault = true; +    } +} + +#endif + diff --git a/src/RemoteControl.h b/src/RemoteControl.h index e7bb7fe..df99386 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -3,10 +3,10 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2014 +   Copyright (C) 2016     Matthias P. Braendli, matthias.braendli@mpb.li -   This module adds remote-control capability to some of the dabmod modules. +   This module adds remote-control capability to some of the dabmux modules.     see testremotecontrol/test.cpp for an example of how to use this.   */  /* @@ -26,8 +26,15 @@     along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>.   */ -#ifndef _REMOTECONTROL_H -#define _REMOTECONTROL_H +#pragma once + +#ifdef HAVE_CONFIG_H +#  include "config.h" +#endif + +#if defined(HAVE_ZEROMQ) +#  include "zmq.hpp" +#endif  #include <list>  #include <map> @@ -36,14 +43,13 @@  #include <atomic>  #include <iostream>  #include <boost/bind.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/enable_shared_from_this.hpp>  #include <boost/asio.hpp>  #include <boost/foreach.hpp>  #include <boost/tokenizer.hpp>  #include <boost/thread.hpp>  #include <stdexcept> +#include "Log.h"  #define RC_ADD_PARAMETER(p, desc) {   \    std::vector<std::string> p; \ @@ -52,7 +58,6 @@    m_parameters.push_back(p); \  } -  class ParameterError : public std::exception  {      public: @@ -71,9 +76,6 @@ class RemoteControllable;   */  class BaseRemoteController {      public: -        /* Add a new controllable under this controller's command */ -        virtual void enrol(RemoteControllable* controllable) = 0; -          /* When this returns one, the remote controller cannot be           * used anymore, and must be restarted by dabmux           */ @@ -90,10 +92,13 @@ class BaseRemoteController {  /* Objects that support remote control must implement the following class */  class RemoteControllable {      public: +        RemoteControllable(const std::string& name) : +            m_name(name) {} -        RemoteControllable(std::string name) : m_name(name) {} +        RemoteControllable(const RemoteControllable& other) = delete; +        RemoteControllable& operator=(const RemoteControllable& other) = delete; -        virtual ~RemoteControllable() {} +        virtual ~RemoteControllable();          /* return a short name used to identify the controllable.           * It might be used in the commands the user has to type, so keep @@ -101,33 +106,19 @@ class RemoteControllable {           */          virtual std::string get_rc_name() const { return m_name; } -        /* Tell the controllable to enrol at the given controller */ -        virtual void enrol_at(BaseRemoteController& controller) { -            controller.enrol(this); -        } - -        virtual void enrol_at(std::shared_ptr<BaseRemoteController> controller) { -            controller->enrol(this); -        } -          /* Return a list of possible parameters that can be set */ -        virtual std::list<std::string> get_supported_parameters() const { -            std::list<std::string> parameterlist; -            for (std::list< std::vector<std::string> >::const_iterator it = m_parameters.begin(); -                    it != m_parameters.end(); ++it) { -                parameterlist.push_back((*it)[0]); -            } -            return parameterlist; -        } +        virtual std::list<std::string> get_supported_parameters() const;          /* Return a mapping of the descriptions of all parameters */          virtual std::list< std::vector<std::string> > -            get_parameter_descriptions() const { -            return m_parameters; -        } +            get_parameter_descriptions() const +            { +                return m_parameters; +            }          /* Base function to set parameters. */ -        virtual void set_parameter(const std::string& parameter, +        virtual void set_parameter( +                const std::string& parameter,                  const std::string& value) = 0;          /* Getting a parameter always returns a string. */ @@ -138,30 +129,93 @@ class RemoteControllable {          std::list< std::vector<std::string> > m_parameters;  }; +/* Holds all our remote controllers and controlled object. + */ +class RemoteControllers { +    public: +        void add_controller(std::shared_ptr<BaseRemoteController> rc) { +            m_controllers.push_back(rc); +        } + +        void enrol(RemoteControllable *rc) { +            controllables.push_back(rc); +        } + +        void remove_controllable(RemoteControllable *rc) { +            controllables.remove(rc); +        } + +        void check_faults() { +            for (auto &controller : m_controllers) { +                if (controller->fault_detected()) +                { +                    etiLog.level(warn) << +                            "Detected Remote Control fault, restarting it"; +                    controller->restart(); +                } +            } +        } + +        std::list< std::vector<std::string> > +            get_param_list_values(const std::string& name) { +            RemoteControllable* controllable = get_controllable_(name); + +            std::list< std::vector<std::string> > allparams; +            for (auto ¶m : controllable->get_supported_parameters()) { +                std::vector<std::string> item; +                item.push_back(param); +                item.push_back(controllable->get_parameter(param)); + +                allparams.push_back(item); +            } +            return allparams; +        } + +        std::string get_param(const std::string& name, const std::string& param) { +            RemoteControllable* controllable = get_controllable_(name); +            return controllable->get_parameter(param); +        } + +        void set_param(const std::string& name, const std::string& param, const std::string& value) { +            RemoteControllable* controllable = get_controllable_(name); +            return controllable->set_parameter(param, value); +        } + +        std::list<RemoteControllable*> controllables; + +    private: +        RemoteControllable* get_controllable_(const std::string& name); + +        std::list<std::shared_ptr<BaseRemoteController> > m_controllers; +}; + +extern RemoteControllers rcs; + +  /* Implements a Remote controller based on a simple telnet CLI   * that listens on localhost   */  class RemoteControllerTelnet : public BaseRemoteController {      public: -        RemoteControllerTelnet() : -            m_running(false), -            m_io_service(), -            m_fault(false), +        RemoteControllerTelnet() +            : m_running(false), m_fault(false),              m_port(0) { } -        RemoteControllerTelnet(int port) : -            m_running(false), -            m_io_service(), -            m_fault(false), -            m_port(port) -        { -            restart(); -        } +        RemoteControllerTelnet(int port) +            : m_running(true), m_fault(false), +            m_child_thread(&RemoteControllerTelnet::process, this, 0), +            m_port(port) { } -        ~RemoteControllerTelnet(); +        RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; +        RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; -        void enrol(RemoteControllable* controllable) { -            m_cohort.push_back(controllable); +        ~RemoteControllerTelnet() { +            m_running = false; +            m_fault = false; +            if (m_port) { +                m_child_thread.interrupt(); +                m_child_thread.join(); +            }          }          virtual bool fault_detected() { return m_fault; } @@ -178,14 +232,6 @@ class RemoteControllerTelnet : public BaseRemoteController {          void reply(boost::asio::ip::tcp::socket& socket, std::string message); -        void handle_accept( -                const boost::system::error_code& boost_error, -                boost::shared_ptr< boost::asio::ip::tcp::socket > socket, -                boost::asio::ip::tcp::acceptor& acceptor); - -        RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other); -        RemoteControllerTelnet(const RemoteControllerTelnet& other); -          std::vector<std::string> tokenise_(std::string message) {              std::vector<std::string> all_tokens; @@ -197,6 +243,67 @@ class RemoteControllerTelnet : public BaseRemoteController {              return all_tokens;          } +        std::atomic<bool> m_running; + +        /* This is set to true if a fault occurred */ +        std::atomic<bool> m_fault; +        boost::thread m_restarter_thread; + +        boost::thread m_child_thread; + +        int m_port; +}; + +#if 0 // #if defined(HAVE_ZEROMQ) +/* Implements a Remote controller using zmq transportlayer + * that listens on localhost + */ +class RemoteControllerZmq : public BaseRemoteController { +    public: +        RemoteControllerZmq() +            : m_running(false), m_fault(false), +            m_zmqContext(1), +            m_endpoint("") { } + +        RemoteControllerZmq(std::string endpoint) +            : m_running(true), m_fault(false), +            m_zmqContext(1), +            m_endpoint(endpoint), +            m_child_thread(&RemoteControllerZmq::process, this) { } + +        RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; +        RemoteControllerZmq(const RemoteControllerZmq& other) = delete; + +        ~RemoteControllerZmq() { +            m_running = false; +            m_fault = false; +            if (!m_endpoint.empty()) { +                m_child_thread.interrupt(); +                m_child_thread.join(); +            } +        } + +        void enrol(RemoteControllable* controllable) { +            m_cohort.push_back(controllable); +        } + +        void disengage(RemoteControllable* controllable) { +            m_cohort.remove(controllable); +        } + +        virtual bool fault_detected() { return m_fault; } + +        virtual void restart(); + +    private: +        void restart_thread(); + +        void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message); +        void send_ok_reply(zmq::socket_t &pSocket); +        void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); +        void process(); + +          RemoteControllable* get_controllable_(std::string name) {              for (std::list<RemoteControllable*>::iterator it = m_cohort.begin();                      it != m_cohort.end(); ++it) { @@ -208,15 +315,14 @@ class RemoteControllerTelnet : public BaseRemoteController {              throw ParameterError("Module name unknown");          } -        std::list< std::vector<std::string> > -            get_parameter_descriptions_(std::string name) { +        std::string get_param_(std::string name, std::string param) {              RemoteControllable* controllable = get_controllable_(name); -            return controllable->get_parameter_descriptions(); +            return controllable->get_parameter(param);          } -        std::list<std::string> get_param_list_(std::string name) { +        void set_param_(std::string name, std::string param, std::string value) {              RemoteControllable* controllable = get_controllable_(name); -            return controllable->get_supported_parameters(); +            return controllable->set_parameter(param, value);          }          std::list< std::vector<std::string> > @@ -224,55 +330,32 @@ class RemoteControllerTelnet : public BaseRemoteController {              RemoteControllable* controllable = get_controllable_(name);              std::list< std::vector<std::string> > allparams; -            std::list<std::string> params = controllable->get_supported_parameters(); -            for (std::list<std::string>::iterator it = params.begin(); -                    it != params.end(); ++it) { + +            for (auto ¶m : controllable->get_supported_parameters()) {                  std::vector<std::string> item; -                item.push_back(*it); -                item.push_back(controllable->get_parameter(*it)); +                item.push_back(param); +                item.push_back(controllable->get_parameter(param));                  allparams.push_back(item);              } +              return allparams;          } -        std::string get_param_(std::string name, std::string param) { -            RemoteControllable* controllable = get_controllable_(name); -            return controllable->get_parameter(param); -        } - -        void set_param_(std::string name, std::string param, std::string value) { -            RemoteControllable* controllable = get_controllable_(name); -            return controllable->set_parameter(param, value); -        }          std::atomic<bool> m_running; -        boost::asio::io_service m_io_service; -          /* This is set to true if a fault occurred */          std::atomic<bool> m_fault;          boost::thread m_restarter_thread; -        boost::thread m_child_thread; +        zmq::context_t m_zmqContext;          /* This controller commands the controllables in the cohort */          std::list<RemoteControllable*> m_cohort; -        int m_port; -}; - - -/* The Dummy remote controller does nothing, and never fails - */ -class RemoteControllerDummy : public BaseRemoteController { -    public: -        void enrol(RemoteControllable*) {} - -        bool fault_detected() { return false; } - -        virtual void restart() {} +        std::string m_endpoint; +        boost::thread m_child_thread;  }; -  #endif | 
