/*
Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
Copyright (C) 2016
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
ODR-DabMod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMod. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include "RemoteControl.h"
using boost::asio::ip::tcp;
using namespace std;
RemoteControllers rcs;
void RemoteControllerTelnet::restart()
{
m_restarter_thread = boost::thread(
&RemoteControllerTelnet::restart_thread,
this, 0);
}
RemoteControllable::~RemoteControllable() {
rcs.remove_controllable(this);
}
std::list RemoteControllable::get_supported_parameters() const {
std::list parameterlist;
for (const auto& param : m_parameters) {
parameterlist.push_back(param[0]);
}
return parameterlist;
}
RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) {
auto rc = std::find_if(controllables.begin(), controllables.end(),
[&](RemoteControllable* r) { return r->get_rc_name() == name; });
if (rc == controllables.end()) {
throw ParameterError("Module name unknown");
}
else {
return *rc;
}
}
// This runs in a separate thread, because
// it would take too long to be done in the main loop
// thread.
void RemoteControllerTelnet::restart_thread(long)
{
m_running = false;
if (m_port) {
m_child_thread.interrupt();
m_child_thread.join();
}
m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0);
}
void RemoteControllerTelnet::process(long)
{
std::string m_welcome = "ODR-DabMod Remote Control CLI\n"
"Write 'help' for help.\n"
"**********\n";
std::string m_prompt = "> ";
std::string in_message;
size_t length;
try {
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(
boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
while (m_running) {
in_message = "";
tcp::socket socket(io_service);
acceptor.accept(socket);
boost::system::error_code ignored_error;
boost::asio::write(socket, boost::asio::buffer(m_welcome),
boost::asio::transfer_all(),
ignored_error);
while (m_running && in_message != "quit") {
boost::asio::write(socket, boost::asio::buffer(m_prompt),
boost::asio::transfer_all(),
ignored_error);
in_message = "";
boost::asio::streambuf buffer;
length = boost::asio::read_until( socket, buffer, "\n", ignored_error);
std::istream str(&buffer);
std::getline(str, in_message);
if (length == 0) {
std::cerr << "RC: Connection terminated" << std::endl;
break;
}
while (in_message.length() > 0 &&
(in_message[in_message.length()-1] == '\r' ||
in_message[in_message.length()-1] == '\n')) {
in_message.erase(in_message.length()-1, 1);
}
if (in_message.length() == 0) {
continue;
}
std::cerr << "RC: Got message '" << in_message << "'" << std::endl;
dispatch_command(socket, in_message);
}
std::cerr << "RC: Closing socket" << std::endl;
socket.close();
}
}
catch (std::exception& e) {
std::cerr << "Remote control caught exception: " << e.what() << std::endl;
m_fault = true;
}
}
void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)
{
vector cmd = tokenise_(command);
if (cmd[0] == "help") {
reply(socket,
"The following commands are supported:\n"
" list\n"
" * Lists the modules that are loaded and their parameters\n"
" show MODULE\n"
" * Lists all parameters and their values from module MODULE\n"
" get MODULE PARAMETER\n"
" * Gets the value for the specified PARAMETER from module MODULE\n"
" set MODULE PARAMETER VALUE\n"
" * Sets the value for the PARAMETER ofr module MODULE\n"
" quit\n"
" * Terminate this session\n"
"\n");
}
else if (cmd[0] == "list") {
stringstream ss;
if (cmd.size() == 1) {
for (auto &controllable : rcs.controllables) {
ss << controllable->get_rc_name() << endl;
list< vector > params = controllable->get_parameter_descriptions();
for (auto ¶m : params) {
ss << "\t" << param[0] << " : " << param[1] << endl;
}
}
}
else {
reply(socket, "Too many arguments for command 'list'");
}
reply(socket, ss.str());
}
else if (cmd[0] == "show") {
if (cmd.size() == 2) {
try {
stringstream ss;
list< vector > r = rcs.get_param_list_values(cmd[1]);
for (auto ¶m_val : r) {
ss << param_val[0] << ": " << param_val[1] << endl;
}
reply(socket, ss.str());
}
catch (ParameterError &e) {
reply(socket, e.what());
}
}
else {
reply(socket, "Incorrect parameters for command 'show'");
}
}
else if (cmd[0] == "get") {
if (cmd.size() == 3) {
try {
string r = rcs.get_param(cmd[1], cmd[2]);
reply(socket, r);
}
catch (ParameterError &e) {
reply(socket, e.what());
}
}
else {
reply(socket, "Incorrect parameters for command 'get'");
}
}
else if (cmd[0] == "set") {
if (cmd.size() >= 4) {
try {
stringstream new_param_value;
for (size_t i = 3; i < cmd.size(); i++) {
new_param_value << cmd[i];
if (i+1 < cmd.size()) {
new_param_value << " ";
}
}
rcs.set_param(cmd[1], cmd[2], new_param_value.str());
reply(socket, "ok");
}
catch (ParameterError &e) {
reply(socket, e.what());
}
catch (exception &e) {
reply(socket, "Error: Invalid parameter value. ");
}
}
else {
reply(socket, "Incorrect parameters for command 'set'");
}
}
else if (cmd[0] == "quit") {
reply(socket, "Goodbye");
}
else {
reply(socket, "Message not understood");
}
}
void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
{
boost::system::error_code ignored_error;
stringstream ss;
ss << message << "\r\n";
boost::asio::write(socket, boost::asio::buffer(ss.str()),
boost::asio::transfer_all(),
ignored_error);
}
#if defined(HAVE_ZEROMQ)
void RemoteControllerZmq::restart()
{
m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this);
}
// This runs in a separate thread, because
// it would take too long to be done in the main loop
// thread.
void RemoteControllerZmq::restart_thread()
{
m_running = false;
if (!m_endpoint.empty()) {
m_child_thread.interrupt();
m_child_thread.join();
}
m_child_thread = boost::thread(&RemoteControllerZmq::process, this);
}
void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector &message)
{
bool more = true;
do {
zmq::message_t msg;
pSocket.recv(&msg);
std::string incoming((char*)msg.data(), msg.size());
message.push_back(incoming);
more = msg.more();
} while (more);
}
void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket)
{
zmq::message_t msg(2);
char repCode[2] = {'o', 'k'};
memcpy ((void*) msg.data(), repCode, 2);
pSocket.send(msg, 0);
}
void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error)
{
zmq::message_t msg1(4);
char repCode[4] = {'f', 'a', 'i', 'l'};
memcpy ((void*) msg1.data(), repCode, 4);
pSocket.send(msg1, ZMQ_SNDMORE);
zmq::message_t msg2(error.length());
memcpy ((void*) msg2.data(), error.c_str(), error.length());
pSocket.send(msg2, 0);
}
void RemoteControllerZmq::process()
{
// create zmq reply socket for receiving ctrl parameters
etiLog.level(info) << "Starting zmq remote control thread";
try {
zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
// connect the socket
int hwm = 100;
int linger = 0;
repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
repSocket.bind(m_endpoint.c_str());
// create pollitem that polls the ZMQ sockets
zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
for (;;) {
zmq::poll(pollItems, 1, 100);
std::vector msg;
if (pollItems[0].revents & ZMQ_POLLIN) {
recv_all(repSocket, msg);
std::string command((char*)msg[0].data(), msg[0].size());
if (msg.size() == 1 && command == "ping") {
send_ok_reply(repSocket);
}
else if (msg.size() == 1 && command == "list") {
size_t cohort_size = rcs.controllables.size();
for (auto &controllable : rcs.controllables) {
std::stringstream ss;
ss << controllable->get_rc_name();
std::string msg_s = ss.str();
zmq::message_t msg(ss.str().size());
memcpy ((void*) msg.data(), msg_s.data(), msg_s.size());
int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0;
repSocket.send(msg, flag);
}
}
else if (msg.size() == 2 && command == "show") {
std::string module((char*) msg[1].data(), msg[1].size());
try {
list< vector > r = rcs.get_param_list_values(module);
size_t r_size = r.size();
for (auto ¶m_val : r) {
std::stringstream ss;
ss << param_val[0] << ": " << param_val[1] << endl;
zmq::message_t msg(ss.str().size());
memcpy(msg.data(), ss.str().data(), ss.str().size());
int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0;
repSocket.send(msg, flag);
}
}
catch (ParameterError &e) {
send_fail_reply(repSocket, e.what());
}
}
else if (msg.size() == 3 && command == "get") {
std::string module((char*) msg[1].data(), msg[1].size());
std::string parameter((char*) msg[2].data(), msg[2].size());
try {
std::string value = rcs.get_param(module, parameter);
zmq::message_t msg(value.size());
memcpy ((void*) msg.data(), value.data(), value.size());
repSocket.send(msg, 0);
}
catch (ParameterError &err) {
send_fail_reply(repSocket, err.what());
}
}
else if (msg.size() == 4 && command == "set") {
std::string module((char*) msg[1].data(), msg[1].size());
std::string parameter((char*) msg[2].data(), msg[2].size());
std::string value((char*) msg[3].data(), msg[3].size());
try {
rcs.set_param(module, parameter, value);
send_ok_reply(repSocket);
}
catch (ParameterError &err) {
send_fail_reply(repSocket, err.what());
}
}
else {
send_fail_reply(repSocket,
"Unsupported command. commands: list, show, get, set");
}
}
// check if thread is interrupted
boost::this_thread::interruption_point();
}
repSocket.close();
}
catch (boost::thread_interrupted&) {}
catch (zmq::error_t &e) {
etiLog.level(error) << "ZMQ RC error: " << std::string(e.what());
}
catch (std::exception& e) {
etiLog.level(error) << "ZMQ RC caught exception: " << e.what();
m_fault = true;
}
}
#endif