summaryrefslogtreecommitdiffstats
path: root/src/RemoteControl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/RemoteControl.cpp')
-rw-r--r--src/RemoteControl.cpp369
1 files changed, 258 insertions, 111 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index 9ecb018..12ab84e 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -27,145 +27,131 @@
#include <iostream>
#include <string>
#include <boost/asio.hpp>
-#include <boost/bind.hpp>
#include <boost/thread.hpp>
-#include "Log.h"
#include "RemoteControl.h"
using boost::asio::ip::tcp;
using namespace std;
-RemoteControllerTelnet::~RemoteControllerTelnet()
-{
- m_running = false;
- m_io_service.stop();
- m_child_thread.join();
-}
+RemoteControllers rcs;
void RemoteControllerTelnet::restart()
{
- m_restarter_thread = boost::thread(&RemoteControllerTelnet::restart_thread,
+ m_restarter_thread = boost::thread(
+ &RemoteControllerTelnet::restart_thread,
this, 0);
}
+RemoteControllable::~RemoteControllable() {
+ rcs.remove_controllable(this);
+}
+
+std::list<std::string> RemoteControllable::get_supported_parameters() const {
+ std::list<std::string> parameterlist;
+ for (const auto& param : m_parameters) {
+ parameterlist.push_back(param[0]);
+ }
+ return parameterlist;
+}
+
+RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) {
+ auto rc = std::find_if(controllables.begin(), controllables.end(),
+ [&](RemoteControllable* r) { return r->get_rc_name() == name; });
+
+ if (rc == controllables.end()) {
+ throw ParameterError("Module name unknown");
+ }
+ else {
+ return *rc;
+ }
+}
+
// This runs in a separate thread, because
// it would take too long to be done in the main loop
// thread.
void RemoteControllerTelnet::restart_thread(long)
{
m_running = false;
- m_io_service.stop();
- m_child_thread.join();
+ if (m_port) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0);
}
-void RemoteControllerTelnet::handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor)
+void RemoteControllerTelnet::process(long)
{
-
- const std::string welcome = "ODR-DabMux Remote Control CLI\n"
- "Write 'help' for help.\n"
- "**********\n";
- const std::string prompt = "> ";
+ std::string m_welcome = "ODR-DabMux Remote Control CLI\n"
+ "Write 'help' for help.\n"
+ "**********\n";
+ std::string m_prompt = "> ";
std::string in_message;
size_t length;
- if (boost_error)
- {
- etiLog.level(error) << "RC: Error accepting connection";
- return;
- }
-
try {
- etiLog.level(info) << "RC: Accepted";
-
- boost::system::error_code ignored_error;
-
- boost::asio::write(*socket, boost::asio::buffer(welcome),
- boost::asio::transfer_all(),
- ignored_error);
-
- while (m_running && in_message != "quit") {
- boost::asio::write(*socket, boost::asio::buffer(prompt),
- boost::asio::transfer_all(),
- ignored_error);
+ boost::asio::io_service io_service;
+ tcp::acceptor acceptor(io_service, tcp::endpoint(
+ boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
+ while (m_running) {
in_message = "";
- boost::asio::streambuf buffer;
- length = boost::asio::read_until(*socket, buffer, "\n", ignored_error);
+ tcp::socket socket(io_service);
- std::istream str(&buffer);
- std::getline(str, in_message);
+ acceptor.accept(socket);
- if (length == 0) {
- etiLog.level(info) << "RC: Connection terminated";
- break;
- }
+ boost::system::error_code ignored_error;
- while (in_message.length() > 0 &&
- (in_message[in_message.length()-1] == '\r' ||
- in_message[in_message.length()-1] == '\n')) {
- in_message.erase(in_message.length()-1, 1);
- }
+ boost::asio::write(socket, boost::asio::buffer(m_welcome),
+ boost::asio::transfer_all(),
+ ignored_error);
- if (in_message.length() == 0) {
- continue;
- }
+ while (m_running && in_message != "quit") {
+ boost::asio::write(socket, boost::asio::buffer(m_prompt),
+ boost::asio::transfer_all(),
+ ignored_error);
- etiLog.level(info) << "RC: Got message '" << in_message << "'";
+ in_message = "";
- dispatch_command(*socket, in_message);
- }
- etiLog.level(info) << "RC: Closing socket";
- socket->close();
- }
- catch (std::exception& e)
- {
- etiLog.level(error) << "Remote control caught exception: " << e.what();
- }
-}
+ boost::asio::streambuf buffer;
+ length = boost::asio::read_until( socket, buffer, "\n", ignored_error);
-void RemoteControllerTelnet::process(long)
-{
- m_running = true;
-
- while (m_running) {
- m_io_service.reset();
+ std::istream str(&buffer);
+ std::getline(str, in_message);
- tcp::acceptor acceptor(m_io_service, tcp::endpoint(
- boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
+ if (length == 0) {
+ std::cerr << "RC: Connection terminated" << std::endl;
+ break;
+ }
+ while (in_message.length() > 0 &&
+ (in_message[in_message.length()-1] == '\r' ||
+ in_message[in_message.length()-1] == '\n')) {
+ in_message.erase(in_message.length()-1, 1);
+ }
- // Add a job to start accepting connections.
- boost::shared_ptr<tcp::socket> socket(
- new tcp::socket(acceptor.get_io_service()));
+ if (in_message.length() == 0) {
+ continue;
+ }
- // Add an accept call to the service. This will prevent io_service::run()
- // from returning.
- etiLog.level(info) << "RC: Waiting for connection on port " << m_port;
- acceptor.async_accept(*socket,
- boost::bind(&RemoteControllerTelnet::handle_accept,
- this,
- boost::asio::placeholders::error,
- socket,
- boost::ref(acceptor)));
+ std::cerr << "RC: Got message '" << in_message << "'" << std::endl;
- // Process event loop.
- m_io_service.run();
+ dispatch_command(socket, in_message);
+ }
+ std::cerr << "RC: Closing socket" << std::endl;
+ socket.close();
+ }
+ }
+ catch (std::exception& e) {
+ std::cerr << "Remote control caught exception: " << e.what() << std::endl;
+ m_fault = true;
}
-
- etiLog.level(info) << "RC: Leaving";
- m_fault = true;
}
-
void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)
{
vector<string> cmd = tokenise_(command);
@@ -189,16 +175,12 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
stringstream ss;
if (cmd.size() == 1) {
- for (list<RemoteControllable*>::iterator it = m_cohort.begin();
- it != m_cohort.end(); ++it) {
- ss << (*it)->get_rc_name() << endl;
-
- list< vector<string> >::iterator param;
- list< vector<string> > params = (*it)->get_parameter_descriptions();
- for (param = params.begin();
- param != params.end();
- ++param) {
- ss << "\t" << (*param)[0] << " : " << (*param)[1] << endl;
+ for (auto &controllable : rcs.controllables) {
+ ss << controllable->get_rc_name() << endl;
+
+ list< vector<string> > params = controllable->get_parameter_descriptions();
+ for (auto &param : params) {
+ ss << "\t" << param[0] << " : " << param[1] << endl;
}
}
}
@@ -212,10 +194,9 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
if (cmd.size() == 2) {
try {
stringstream ss;
- list< vector<string> > r = get_param_list_values_(cmd[1]);
- for (list< vector<string> >::iterator it = r.begin();
- it != r.end(); ++it) {
- ss << (*it)[0] << ": " << (*it)[1] << endl;
+ list< vector<string> > r = rcs.get_param_list_values(cmd[1]);
+ for (auto &param_val : r) {
+ ss << param_val[0] << ": " << param_val[1] << endl;
}
reply(socket, ss.str());
@@ -224,23 +205,21 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
reply(socket, e.what());
}
}
- else
- {
+ else {
reply(socket, "Incorrect parameters for command 'show'");
}
}
else if (cmd[0] == "get") {
if (cmd.size() == 3) {
try {
- string r = get_param_(cmd[1], cmd[2]);
+ string r = rcs.get_param(cmd[1], cmd[2]);
reply(socket, r);
}
catch (ParameterError &e) {
reply(socket, e.what());
}
}
- else
- {
+ else {
reply(socket, "Incorrect parameters for command 'get'");
}
}
@@ -256,7 +235,7 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
}
}
- set_param_(cmd[1], cmd[2], new_param_value.str());
+ rcs.set_param(cmd[1], cmd[2], new_param_value.str());
reply(socket, "ok");
}
catch (ParameterError &e) {
@@ -288,3 +267,171 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
ignored_error);
}
+
+#if defined(HAVE_RC_ZEROMQ)
+
+void RemoteControllerZmq::restart()
+{
+ m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this);
+}
+
+// This runs in a separate thread, because
+// it would take too long to be done in the main loop
+// thread.
+void RemoteControllerZmq::restart_thread()
+{
+ m_running = false;
+
+ if (!m_endpoint.empty()) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
+
+ m_child_thread = boost::thread(&RemoteControllerZmq::process, this);
+}
+
+void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message)
+{
+ bool more = true;
+ do {
+ zmq::message_t msg;
+ pSocket.recv(&msg);
+ std::string incoming((char*)msg.data(), msg.size());
+ message.push_back(incoming);
+ more = msg.more();
+ } while (more);
+}
+
+void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket)
+{
+ zmq::message_t msg(2);
+ char repCode[2] = {'o', 'k'};
+ memcpy ((void*) msg.data(), repCode, 2);
+ pSocket.send(msg, 0);
+}
+
+void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error)
+{
+ zmq::message_t msg1(4);
+ char repCode[4] = {'f', 'a', 'i', 'l'};
+ memcpy ((void*) msg1.data(), repCode, 4);
+ pSocket.send(msg1, ZMQ_SNDMORE);
+
+ zmq::message_t msg2(error.length());
+ memcpy ((void*) msg2.data(), error.c_str(), error.length());
+ pSocket.send(msg2, 0);
+}
+
+void RemoteControllerZmq::process()
+{
+ // create zmq reply socket for receiving ctrl parameters
+ etiLog.level(info) << "Starting zmq remote control thread";
+ try {
+ zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
+
+ // connect the socket
+ int hwm = 100;
+ int linger = 0;
+ repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
+ repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
+ repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
+ repSocket.bind(m_endpoint.c_str());
+
+ // create pollitem that polls the ZMQ sockets
+ zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
+ for (;;) {
+ zmq::poll(pollItems, 1, 100);
+ std::vector<std::string> msg;
+
+ if (pollItems[0].revents & ZMQ_POLLIN) {
+ recv_all(repSocket, msg);
+
+ std::string command((char*)msg[0].data(), msg[0].size());
+
+ if (msg.size() == 1 && command == "ping") {
+ send_ok_reply(repSocket);
+ }
+ else if (msg.size() == 1 && command == "list") {
+ size_t cohort_size = rcs.controllables.size();
+ for (auto &controllable : rcs.controllables) {
+ std::stringstream ss;
+ ss << controllable->get_rc_name();
+
+ std::string msg_s = ss.str();
+
+ zmq::message_t msg(ss.str().size());
+ memcpy ((void*) msg.data(), msg_s.data(), msg_s.size());
+
+ int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0;
+ repSocket.send(msg, flag);
+ }
+ }
+ else if (msg.size() == 2 && command == "show") {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ try {
+ list< vector<string> > r = rcs.get_param_list_values(module);
+ size_t r_size = r.size();
+ for (auto &param_val : r) {
+ std::stringstream ss;
+ ss << param_val[0] << ": " << param_val[1] << endl;
+ zmq::message_t msg(ss.str().size());
+ memcpy(msg.data(), ss.str().data(), ss.str().size());
+
+ int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0;
+ repSocket.send(msg, flag);
+ }
+ }
+ catch (ParameterError &e) {
+ send_fail_reply(repSocket, e.what());
+ }
+ }
+ else if (msg.size() == 3 && command == "get") {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ std::string parameter((char*) msg[2].data(), msg[2].size());
+
+ try {
+ std::string value = rcs.get_param(module, parameter);
+ zmq::message_t msg(value.size());
+ memcpy ((void*) msg.data(), value.data(), value.size());
+ repSocket.send(msg, 0);
+ }
+ catch (ParameterError &err) {
+ send_fail_reply(repSocket, err.what());
+ }
+ }
+ else if (msg.size() == 4 && command == "set") {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ std::string parameter((char*) msg[2].data(), msg[2].size());
+ std::string value((char*) msg[3].data(), msg[3].size());
+
+ try {
+ rcs.set_param(module, parameter, value);
+ send_ok_reply(repSocket);
+ }
+ catch (ParameterError &err) {
+ send_fail_reply(repSocket, err.what());
+ }
+ }
+ else {
+ send_fail_reply(repSocket,
+ "Unsupported command. commands: list, show, get, set");
+ }
+ }
+
+ // check if thread is interrupted
+ boost::this_thread::interruption_point();
+ }
+ repSocket.close();
+ }
+ catch (boost::thread_interrupted&) {}
+ catch (zmq::error_t &e) {
+ etiLog.level(error) << "ZMQ RC error: " << std::string(e.what());
+ }
+ catch (std::exception& e) {
+ etiLog.level(error) << "ZMQ RC caught exception: " << e.what();
+ m_fault = true;
+ }
+}
+
+#endif
+