summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-10-07 16:35:15 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-10-07 16:35:15 +0200
commit17623dd6b6d800e15b3a27de4d14fd1a3a160b1b (patch)
tree9a34766f1bda012f06bf18abcea687c8a4741dd1 /src
parenta951ea2f636360f724ef35e8aabd859e46d42290 (diff)
parent17e6a246149c11bac667a233fff1a33a1d06a1fb (diff)
downloaddabmux-17623dd6b6d800e15b3a27de4d14fd1a3a160b1b.tar.gz
dabmux-17623dd6b6d800e15b3a27de4d14fd1a3a160b1b.tar.bz2
dabmux-17623dd6b6d800e15b3a27de4d14fd1a3a160b1b.zip
Merge 'next' into servicelinking
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp26
-rw-r--r--src/ConfigParser.h6
-rw-r--r--src/DabMultiplexer.cpp27
-rw-r--r--src/DabMultiplexer.h2
-rw-r--r--src/DabMux.cpp22
-rw-r--r--src/RemoteControl.cpp369
-rw-r--r--src/RemoteControl.h264
7 files changed, 444 insertions, 272 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index cb458d7..1876697 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -143,10 +143,8 @@ uint16_t get_announcement_flag_from_ptree(
}
// Parse the linkage section
-void parse_linkage(boost::property_tree::ptree& pt,
- std::shared_ptr<dabEnsemble> ensemble,
- std::shared_ptr<BaseRemoteController> rc
- )
+static void parse_linkage(boost::property_tree::ptree& pt,
+ std::shared_ptr<dabEnsemble> ensemble)
{
using boost::property_tree::ptree;
using boost::property_tree::ptree_error;
@@ -230,10 +228,9 @@ void parse_linkage(boost::property_tree::ptree& pt,
}
}
-void parse_ptree(boost::property_tree::ptree& pt,
- std::shared_ptr<dabEnsemble> ensemble,
- std::shared_ptr<BaseRemoteController> rc
- )
+void parse_ptree(
+ boost::property_tree::ptree& pt,
+ std::shared_ptr<dabEnsemble> ensemble)
{
using boost::property_tree::ptree;
using boost::property_tree::ptree_error;
@@ -335,7 +332,7 @@ void parse_ptree(boost::property_tree::ptree& pt,
pt_announcement.get_child("flags"));
cl->subchanneluid = pt_announcement.get<string>("subchannel");
- cl->enrol_at(*rc);
+ rcs.enrol(cl.get());
ensemble->clusters.push_back(cl);
}
}
@@ -483,7 +480,7 @@ void parse_ptree(boost::property_tree::ptree& pt,
try {
setup_subchannel_from_ptree(subchan, it->second, ensemble,
- subchanuid, rc);
+ subchanuid);
}
catch (runtime_error &e) {
etiLog.log(error,
@@ -644,14 +641,13 @@ void parse_ptree(boost::property_tree::ptree& pt,
}
- parse_linkage(pt, ensemble, rc);
+ parse_linkage(pt, ensemble);
}
void setup_subchannel_from_ptree(DabSubchannel* subchan,
boost::property_tree::ptree &pt,
std::shared_ptr<dabEnsemble> ensemble,
- string subchanuid,
- std::shared_ptr<BaseRemoteController> rc)
+ string subchanuid)
{
using boost::property_tree::ptree;
using boost::property_tree::ptree_error;
@@ -746,7 +742,7 @@ void setup_subchannel_from_ptree(DabSubchannel* subchan,
DabInputZmqMPEG* inzmq =
new DabInputZmqMPEG(subchanuid, zmqconfig);
- inzmq->enrol_at(*rc);
+ rcs.enrol(inzmq);
subchan->input = inzmq;
if (proto == "epmg") {
@@ -812,7 +808,7 @@ void setup_subchannel_from_ptree(DabSubchannel* subchan,
DabInputZmqAAC* inzmq =
new DabInputZmqAAC(subchanuid, zmqconfig);
- inzmq->enrol_at(*rc);
+ rcs.enrol(inzmq);
subchan->input = inzmq;
if (proto == "epmg") {
diff --git a/src/ConfigParser.h b/src/ConfigParser.h
index 217572f..1297b90 100644
--- a/src/ConfigParser.h
+++ b/src/ConfigParser.h
@@ -39,14 +39,12 @@
#include <memory>
void parse_ptree(boost::property_tree::ptree& pt,
- std::shared_ptr<dabEnsemble> ensemble,
- std::shared_ptr<BaseRemoteController> rc);
+ std::shared_ptr<dabEnsemble> ensemble);
void setup_subchannel_from_ptree(DabSubchannel* subchan,
boost::property_tree::ptree &pt,
std::shared_ptr<dabEnsemble> ensemble,
- std::string subchanuid,
- std::shared_ptr<BaseRemoteController> rc);
+ std::string subchanuid);
#endif
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index 3fe3078..90d3d02 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -67,11 +67,9 @@ const unsigned short BitRateTable[64] = {
};
DabMultiplexer::DabMultiplexer(
- std::shared_ptr<BaseRemoteController> rc,
boost::property_tree::ptree pt) :
RemoteControllable("mux"),
m_pt(pt),
- m_rc(rc),
timestamp(0),
MNSC_increment_time(false),
sync(0x49C5F8),
@@ -79,8 +77,8 @@ DabMultiplexer::DabMultiplexer(
ensemble(std::make_shared<dabEnsemble>()),
fig_carousel(ensemble)
{
- RC_ADD_PARAMETER(carousel,
- "Set to 1 to use the new carousel");
+ RC_ADD_PARAMETER(frames,
+ "Show number of frames generated [read-only]");
}
void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf)
@@ -135,10 +133,10 @@ void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf)
// Run a set of checks on the configuration
void DabMultiplexer::prepare()
{
- parse_ptree(m_pt, ensemble, m_rc);
+ parse_ptree(m_pt, ensemble);
- this->enrol_at(m_rc);
- ensemble->enrol_at(m_rc);
+ rcs.enrol(this);
+ rcs.enrol(ensemble.get());
prepare_subchannels();
prepare_services_components();
@@ -236,7 +234,7 @@ void DabMultiplexer::prepare_services_components()
throw MuxInitException();
}
- service->enrol_at(m_rc);
+ rcs.enrol(service.get());
// Adjust components type for DAB+
while (component != ensemble->components.end()) {
@@ -296,8 +294,7 @@ void DabMultiplexer::prepare_services_components()
component->packet.id = cur_packetid++;
- component->enrol_at(m_rc);
-
+ rcs.enrol(component);
}
}
@@ -796,7 +793,12 @@ void DabMultiplexer::print_info(void)
void DabMultiplexer::set_parameter(const std::string& parameter,
const std::string& value)
{
- if (0) {
+ if (parameter == "frames") {
+ stringstream ss;
+ ss << "Parameter '" << parameter <<
+ "' of " << get_rc_name() <<
+ " is read-only";
+ throw ParameterError(ss.str());
}
else {
stringstream ss;
@@ -810,7 +812,8 @@ void DabMultiplexer::set_parameter(const std::string& parameter,
const std::string DabMultiplexer::get_parameter(const std::string& parameter) const
{
stringstream ss;
- if (0) {
+ if (parameter == "frames") {
+ ss << currentFrame;
}
else {
ss << "Parameter '" << parameter <<
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index e069da5..0d008be 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -55,7 +55,6 @@
class DabMultiplexer : public RemoteControllable {
public:
DabMultiplexer(
- std::shared_ptr<BaseRemoteController> rc,
boost::property_tree::ptree pt);
void prepare(void);
@@ -80,7 +79,6 @@ class DabMultiplexer : public RemoteControllable {
void prepare_data_inputs(void);
boost::property_tree::ptree m_pt;
- std::shared_ptr<BaseRemoteController> m_rc;
unsigned timestamp;
bool MNSC_increment_time;
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 689b762..aefa701 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -135,8 +135,6 @@ using namespace std;
using boost::property_tree::ptree;
using boost::property_tree::ptree_error;
-
-
volatile sig_atomic_t running = 1;
/* We are not allowed to use etiLog in the signal handler,
@@ -275,17 +273,18 @@ int main(int argc, char *argv[])
/************** READ REMOTE CONTROL PARAMETERS *************/
int telnetport = pt.get<int>("remotecontrol.telnetport", 0);
-
- std::shared_ptr<BaseRemoteController> rc;
-
if (telnetport != 0) {
- rc = std::make_shared<RemoteControllerTelnet>(telnetport);
+ auto rc = std::make_shared<RemoteControllerTelnet>(telnetport);
+ rcs.add_controller(rc);
}
- else {
- rc = std::make_shared<RemoteControllerDummy>();
+
+ auto zmqendpoint = pt.get<string>("remotecontrol.zmqendpoint", "");
+ if (not zmqendpoint.empty()) {
+ auto rc = std::make_shared<RemoteControllerZmq>(zmqendpoint);
+ rcs.add_controller(rc);
}
- DabMultiplexer mux(rc, pt);
+ DabMultiplexer mux(pt);
etiLog.level(info) <<
PACKAGE_NAME << " " <<
@@ -460,9 +459,8 @@ int main(int argc, char *argv[])
}
/* Check every six seconds if the remote control is still working */
- if ((currentFrame % 250 == 249) && rc->fault_detected()) {
- etiLog.level(warn) << "Detected Remote Control fault, restarting it";
- rc->restart();
+ if (currentFrame % 250 == 249) {
+ rcs.check_faults();
}
/* Same for statistics server */
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index 9ecb018..12ab84e 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -27,145 +27,131 @@
#include <iostream>
#include <string>
#include <boost/asio.hpp>
-#include <boost/bind.hpp>
#include <boost/thread.hpp>
-#include "Log.h"
#include "RemoteControl.h"
using boost::asio::ip::tcp;
using namespace std;
-RemoteControllerTelnet::~RemoteControllerTelnet()
-{
- m_running = false;
- m_io_service.stop();
- m_child_thread.join();
-}
+RemoteControllers rcs;
void RemoteControllerTelnet::restart()
{
- m_restarter_thread = boost::thread(&RemoteControllerTelnet::restart_thread,
+ m_restarter_thread = boost::thread(
+ &RemoteControllerTelnet::restart_thread,
this, 0);
}
+RemoteControllable::~RemoteControllable() {
+ rcs.remove_controllable(this);
+}
+
+std::list<std::string> RemoteControllable::get_supported_parameters() const {
+ std::list<std::string> parameterlist;
+ for (const auto& param : m_parameters) {
+ parameterlist.push_back(param[0]);
+ }
+ return parameterlist;
+}
+
+RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) {
+ auto rc = std::find_if(controllables.begin(), controllables.end(),
+ [&](RemoteControllable* r) { return r->get_rc_name() == name; });
+
+ if (rc == controllables.end()) {
+ throw ParameterError("Module name unknown");
+ }
+ else {
+ return *rc;
+ }
+}
+
// This runs in a separate thread, because
// it would take too long to be done in the main loop
// thread.
void RemoteControllerTelnet::restart_thread(long)
{
m_running = false;
- m_io_service.stop();
- m_child_thread.join();
+ if (m_port) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0);
}
-void RemoteControllerTelnet::handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor)
+void RemoteControllerTelnet::process(long)
{
-
- const std::string welcome = "ODR-DabMux Remote Control CLI\n"
- "Write 'help' for help.\n"
- "**********\n";
- const std::string prompt = "> ";
+ std::string m_welcome = "ODR-DabMux Remote Control CLI\n"
+ "Write 'help' for help.\n"
+ "**********\n";
+ std::string m_prompt = "> ";
std::string in_message;
size_t length;
- if (boost_error)
- {
- etiLog.level(error) << "RC: Error accepting connection";
- return;
- }
-
try {
- etiLog.level(info) << "RC: Accepted";
-
- boost::system::error_code ignored_error;
-
- boost::asio::write(*socket, boost::asio::buffer(welcome),
- boost::asio::transfer_all(),
- ignored_error);
-
- while (m_running && in_message != "quit") {
- boost::asio::write(*socket, boost::asio::buffer(prompt),
- boost::asio::transfer_all(),
- ignored_error);
+ boost::asio::io_service io_service;
+ tcp::acceptor acceptor(io_service, tcp::endpoint(
+ boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
+ while (m_running) {
in_message = "";
- boost::asio::streambuf buffer;
- length = boost::asio::read_until(*socket, buffer, "\n", ignored_error);
+ tcp::socket socket(io_service);
- std::istream str(&buffer);
- std::getline(str, in_message);
+ acceptor.accept(socket);
- if (length == 0) {
- etiLog.level(info) << "RC: Connection terminated";
- break;
- }
+ boost::system::error_code ignored_error;
- while (in_message.length() > 0 &&
- (in_message[in_message.length()-1] == '\r' ||
- in_message[in_message.length()-1] == '\n')) {
- in_message.erase(in_message.length()-1, 1);
- }
+ boost::asio::write(socket, boost::asio::buffer(m_welcome),
+ boost::asio::transfer_all(),
+ ignored_error);
- if (in_message.length() == 0) {
- continue;
- }
+ while (m_running && in_message != "quit") {
+ boost::asio::write(socket, boost::asio::buffer(m_prompt),
+ boost::asio::transfer_all(),
+ ignored_error);
- etiLog.level(info) << "RC: Got message '" << in_message << "'";
+ in_message = "";
- dispatch_command(*socket, in_message);
- }
- etiLog.level(info) << "RC: Closing socket";
- socket->close();
- }
- catch (std::exception& e)
- {
- etiLog.level(error) << "Remote control caught exception: " << e.what();
- }
-}
+ boost::asio::streambuf buffer;
+ length = boost::asio::read_until( socket, buffer, "\n", ignored_error);
-void RemoteControllerTelnet::process(long)
-{
- m_running = true;
-
- while (m_running) {
- m_io_service.reset();
+ std::istream str(&buffer);
+ std::getline(str, in_message);
- tcp::acceptor acceptor(m_io_service, tcp::endpoint(
- boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
+ if (length == 0) {
+ std::cerr << "RC: Connection terminated" << std::endl;
+ break;
+ }
+ while (in_message.length() > 0 &&
+ (in_message[in_message.length()-1] == '\r' ||
+ in_message[in_message.length()-1] == '\n')) {
+ in_message.erase(in_message.length()-1, 1);
+ }
- // Add a job to start accepting connections.
- boost::shared_ptr<tcp::socket> socket(
- new tcp::socket(acceptor.get_io_service()));
+ if (in_message.length() == 0) {
+ continue;
+ }
- // Add an accept call to the service. This will prevent io_service::run()
- // from returning.
- etiLog.level(info) << "RC: Waiting for connection on port " << m_port;
- acceptor.async_accept(*socket,
- boost::bind(&RemoteControllerTelnet::handle_accept,
- this,
- boost::asio::placeholders::error,
- socket,
- boost::ref(acceptor)));
+ std::cerr << "RC: Got message '" << in_message << "'" << std::endl;
- // Process event loop.
- m_io_service.run();
+ dispatch_command(socket, in_message);
+ }
+ std::cerr << "RC: Closing socket" << std::endl;
+ socket.close();
+ }
+ }
+ catch (std::exception& e) {
+ std::cerr << "Remote control caught exception: " << e.what() << std::endl;
+ m_fault = true;
}
-
- etiLog.level(info) << "RC: Leaving";
- m_fault = true;
}
-
void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)
{
vector<string> cmd = tokenise_(command);
@@ -189,16 +175,12 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
stringstream ss;
if (cmd.size() == 1) {
- for (list<RemoteControllable*>::iterator it = m_cohort.begin();
- it != m_cohort.end(); ++it) {
- ss << (*it)->get_rc_name() << endl;
-
- list< vector<string> >::iterator param;
- list< vector<string> > params = (*it)->get_parameter_descriptions();
- for (param = params.begin();
- param != params.end();
- ++param) {
- ss << "\t" << (*param)[0] << " : " << (*param)[1] << endl;
+ for (auto &controllable : rcs.controllables) {
+ ss << controllable->get_rc_name() << endl;
+
+ list< vector<string> > params = controllable->get_parameter_descriptions();
+ for (auto &param : params) {
+ ss << "\t" << param[0] << " : " << param[1] << endl;
}
}
}
@@ -212,10 +194,9 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
if (cmd.size() == 2) {
try {
stringstream ss;
- list< vector<string> > r = get_param_list_values_(cmd[1]);
- for (list< vector<string> >::iterator it = r.begin();
- it != r.end(); ++it) {
- ss << (*it)[0] << ": " << (*it)[1] << endl;
+ list< vector<string> > r = rcs.get_param_list_values(cmd[1]);
+ for (auto &param_val : r) {
+ ss << param_val[0] << ": " << param_val[1] << endl;
}
reply(socket, ss.str());
@@ -224,23 +205,21 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
reply(socket, e.what());
}
}
- else
- {
+ else {
reply(socket, "Incorrect parameters for command 'show'");
}
}
else if (cmd[0] == "get") {
if (cmd.size() == 3) {
try {
- string r = get_param_(cmd[1], cmd[2]);
+ string r = rcs.get_param(cmd[1], cmd[2]);
reply(socket, r);
}
catch (ParameterError &e) {
reply(socket, e.what());
}
}
- else
- {
+ else {
reply(socket, "Incorrect parameters for command 'get'");
}
}
@@ -256,7 +235,7 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
}
}
- set_param_(cmd[1], cmd[2], new_param_value.str());
+ rcs.set_param(cmd[1], cmd[2], new_param_value.str());
reply(socket, "ok");
}
catch (ParameterError &e) {
@@ -288,3 +267,171 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
ignored_error);
}
+
+#if defined(HAVE_RC_ZEROMQ)
+
+void RemoteControllerZmq::restart()
+{
+ m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this);
+}
+
+// This runs in a separate thread, because
+// it would take too long to be done in the main loop
+// thread.
+void RemoteControllerZmq::restart_thread()
+{
+ m_running = false;
+
+ if (!m_endpoint.empty()) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
+
+ m_child_thread = boost::thread(&RemoteControllerZmq::process, this);
+}
+
+void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message)
+{
+ bool more = true;
+ do {
+ zmq::message_t msg;
+ pSocket.recv(&msg);
+ std::string incoming((char*)msg.data(), msg.size());
+ message.push_back(incoming);
+ more = msg.more();
+ } while (more);
+}
+
+void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket)
+{
+ zmq::message_t msg(2);
+ char repCode[2] = {'o', 'k'};
+ memcpy ((void*) msg.data(), repCode, 2);
+ pSocket.send(msg, 0);
+}
+
+void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error)
+{
+ zmq::message_t msg1(4);
+ char repCode[4] = {'f', 'a', 'i', 'l'};
+ memcpy ((void*) msg1.data(), repCode, 4);
+ pSocket.send(msg1, ZMQ_SNDMORE);
+
+ zmq::message_t msg2(error.length());
+ memcpy ((void*) msg2.data(), error.c_str(), error.length());
+ pSocket.send(msg2, 0);
+}
+
+void RemoteControllerZmq::process()
+{
+ // create zmq reply socket for receiving ctrl parameters
+ etiLog.level(info) << "Starting zmq remote control thread";
+ try {
+ zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
+
+ // connect the socket
+ int hwm = 100;
+ int linger = 0;
+ repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
+ repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
+ repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
+ repSocket.bind(m_endpoint.c_str());
+
+ // create pollitem that polls the ZMQ sockets
+ zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
+ for (;;) {
+ zmq::poll(pollItems, 1, 100);
+ std::vector<std::string> msg;
+
+ if (pollItems[0].revents & ZMQ_POLLIN) {
+ recv_all(repSocket, msg);
+
+ std::string command((char*)msg[0].data(), msg[0].size());
+
+ if (msg.size() == 1 && command == "ping") {
+ send_ok_reply(repSocket);
+ }
+ else if (msg.size() == 1 && command == "list") {
+ size_t cohort_size = rcs.controllables.size();
+ for (auto &controllable : rcs.controllables) {
+ std::stringstream ss;
+ ss << controllable->get_rc_name();
+
+ std::string msg_s = ss.str();
+
+ zmq::message_t msg(ss.str().size());
+ memcpy ((void*) msg.data(), msg_s.data(), msg_s.size());
+
+ int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0;
+ repSocket.send(msg, flag);
+ }
+ }
+ else if (msg.size() == 2 && command == "show") {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ try {
+ list< vector<string> > r = rcs.get_param_list_values(module);
+ size_t r_size = r.size();
+ for (auto &param_val : r) {
+ std::stringstream ss;
+ ss << param_val[0] << ": " << param_val[1] << endl;
+ zmq::message_t msg(ss.str().size());
+ memcpy(msg.data(), ss.str().data(), ss.str().size());
+
+ int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0;
+ repSocket.send(msg, flag);
+ }
+ }
+ catch (ParameterError &e) {
+ send_fail_reply(repSocket, e.what());
+ }
+ }
+ else if (msg.size() == 3 && command == "get") {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ std::string parameter((char*) msg[2].data(), msg[2].size());
+
+ try {
+ std::string value = rcs.get_param(module, parameter);
+ zmq::message_t msg(value.size());
+ memcpy ((void*) msg.data(), value.data(), value.size());
+ repSocket.send(msg, 0);
+ }
+ catch (ParameterError &err) {
+ send_fail_reply(repSocket, err.what());
+ }
+ }
+ else if (msg.size() == 4 && command == "set") {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ std::string parameter((char*) msg[2].data(), msg[2].size());
+ std::string value((char*) msg[3].data(), msg[3].size());
+
+ try {
+ rcs.set_param(module, parameter, value);
+ send_ok_reply(repSocket);
+ }
+ catch (ParameterError &err) {
+ send_fail_reply(repSocket, err.what());
+ }
+ }
+ else {
+ send_fail_reply(repSocket,
+ "Unsupported command. commands: list, show, get, set");
+ }
+ }
+
+ // check if thread is interrupted
+ boost::this_thread::interruption_point();
+ }
+ repSocket.close();
+ }
+ catch (boost::thread_interrupted&) {}
+ catch (zmq::error_t &e) {
+ etiLog.level(error) << "ZMQ RC error: " << std::string(e.what());
+ }
+ catch (std::exception& e) {
+ etiLog.level(error) << "ZMQ RC caught exception: " << e.what();
+ m_fault = true;
+ }
+}
+
+#endif
+
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index e7bb7fe..c682826 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -3,10 +3,10 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2016
Matthias P. Braendli, matthias.braendli@mpb.li
- This module adds remote-control capability to some of the dabmod modules.
+ This module adds remote-control capability to some of the dabmux modules.
see testremotecontrol/test.cpp for an example of how to use this.
*/
/*
@@ -26,8 +26,15 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef _REMOTECONTROL_H
-#define _REMOTECONTROL_H
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#if defined(HAVE_RC_ZEROMQ)
+# include "zmq.hpp"
+#endif
#include <list>
#include <map>
@@ -36,14 +43,13 @@
#include <atomic>
#include <iostream>
#include <boost/bind.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/foreach.hpp>
#include <boost/tokenizer.hpp>
#include <boost/thread.hpp>
#include <stdexcept>
+#include "Log.h"
#define RC_ADD_PARAMETER(p, desc) { \
std::vector<std::string> p; \
@@ -52,7 +58,6 @@
m_parameters.push_back(p); \
}
-
class ParameterError : public std::exception
{
public:
@@ -71,9 +76,6 @@ class RemoteControllable;
*/
class BaseRemoteController {
public:
- /* Add a new controllable under this controller's command */
- virtual void enrol(RemoteControllable* controllable) = 0;
-
/* When this returns one, the remote controller cannot be
* used anymore, and must be restarted by dabmux
*/
@@ -90,10 +92,13 @@ class BaseRemoteController {
/* Objects that support remote control must implement the following class */
class RemoteControllable {
public:
+ RemoteControllable(const std::string& name) :
+ m_name(name) {}
- RemoteControllable(std::string name) : m_name(name) {}
+ RemoteControllable(const RemoteControllable& other) = delete;
+ RemoteControllable& operator=(const RemoteControllable& other) = delete;
- virtual ~RemoteControllable() {}
+ virtual ~RemoteControllable();
/* return a short name used to identify the controllable.
* It might be used in the commands the user has to type, so keep
@@ -101,33 +106,19 @@ class RemoteControllable {
*/
virtual std::string get_rc_name() const { return m_name; }
- /* Tell the controllable to enrol at the given controller */
- virtual void enrol_at(BaseRemoteController& controller) {
- controller.enrol(this);
- }
-
- virtual void enrol_at(std::shared_ptr<BaseRemoteController> controller) {
- controller->enrol(this);
- }
-
/* Return a list of possible parameters that can be set */
- virtual std::list<std::string> get_supported_parameters() const {
- std::list<std::string> parameterlist;
- for (std::list< std::vector<std::string> >::const_iterator it = m_parameters.begin();
- it != m_parameters.end(); ++it) {
- parameterlist.push_back((*it)[0]);
- }
- return parameterlist;
- }
+ virtual std::list<std::string> get_supported_parameters() const;
/* Return a mapping of the descriptions of all parameters */
virtual std::list< std::vector<std::string> >
- get_parameter_descriptions() const {
- return m_parameters;
- }
+ get_parameter_descriptions() const
+ {
+ return m_parameters;
+ }
/* Base function to set parameters. */
- virtual void set_parameter(const std::string& parameter,
+ virtual void set_parameter(
+ const std::string& parameter,
const std::string& value) = 0;
/* Getting a parameter always returns a string. */
@@ -138,30 +129,93 @@ class RemoteControllable {
std::list< std::vector<std::string> > m_parameters;
};
+/* Holds all our remote controllers and controlled object.
+ */
+class RemoteControllers {
+ public:
+ void add_controller(std::shared_ptr<BaseRemoteController> rc) {
+ m_controllers.push_back(rc);
+ }
+
+ void enrol(RemoteControllable *rc) {
+ controllables.push_back(rc);
+ }
+
+ void remove_controllable(RemoteControllable *rc) {
+ controllables.remove(rc);
+ }
+
+ void check_faults() {
+ for (auto &controller : m_controllers) {
+ if (controller->fault_detected())
+ {
+ etiLog.level(warn) <<
+ "Detected Remote Control fault, restarting it";
+ controller->restart();
+ }
+ }
+ }
+
+ std::list< std::vector<std::string> >
+ get_param_list_values(const std::string& name) {
+ RemoteControllable* controllable = get_controllable_(name);
+
+ std::list< std::vector<std::string> > allparams;
+ for (auto &param : controllable->get_supported_parameters()) {
+ std::vector<std::string> item;
+ item.push_back(param);
+ item.push_back(controllable->get_parameter(param));
+
+ allparams.push_back(item);
+ }
+ return allparams;
+ }
+
+ std::string get_param(const std::string& name, const std::string& param) {
+ RemoteControllable* controllable = get_controllable_(name);
+ return controllable->get_parameter(param);
+ }
+
+ void set_param(const std::string& name, const std::string& param, const std::string& value) {
+ RemoteControllable* controllable = get_controllable_(name);
+ return controllable->set_parameter(param, value);
+ }
+
+ std::list<RemoteControllable*> controllables;
+
+ private:
+ RemoteControllable* get_controllable_(const std::string& name);
+
+ std::list<std::shared_ptr<BaseRemoteController> > m_controllers;
+};
+
+extern RemoteControllers rcs;
+
+
/* Implements a Remote controller based on a simple telnet CLI
* that listens on localhost
*/
class RemoteControllerTelnet : public BaseRemoteController {
public:
- RemoteControllerTelnet() :
- m_running(false),
- m_io_service(),
- m_fault(false),
+ RemoteControllerTelnet()
+ : m_running(false), m_fault(false),
m_port(0) { }
- RemoteControllerTelnet(int port) :
- m_running(false),
- m_io_service(),
- m_fault(false),
- m_port(port)
- {
- restart();
- }
+ RemoteControllerTelnet(int port)
+ : m_running(true), m_fault(false),
+ m_child_thread(&RemoteControllerTelnet::process, this, 0),
+ m_port(port) { }
- ~RemoteControllerTelnet();
+ RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete;
+ RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete;
- void enrol(RemoteControllable* controllable) {
- m_cohort.push_back(controllable);
+ ~RemoteControllerTelnet() {
+ m_running = false;
+ m_fault = false;
+ if (m_port) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
}
virtual bool fault_detected() { return m_fault; }
@@ -178,14 +232,6 @@ class RemoteControllerTelnet : public BaseRemoteController {
void reply(boost::asio::ip::tcp::socket& socket, std::string message);
- void handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor);
-
- RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other);
- RemoteControllerTelnet(const RemoteControllerTelnet& other);
-
std::vector<std::string> tokenise_(std::string message) {
std::vector<std::string> all_tokens;
@@ -197,82 +243,68 @@ class RemoteControllerTelnet : public BaseRemoteController {
return all_tokens;
}
- RemoteControllable* get_controllable_(std::string name) {
- for (std::list<RemoteControllable*>::iterator it = m_cohort.begin();
- it != m_cohort.end(); ++it) {
- if ((*it)->get_rc_name() == name)
- {
- return *it;
- }
- }
- throw ParameterError("Module name unknown");
- }
-
- std::list< std::vector<std::string> >
- get_parameter_descriptions_(std::string name) {
- RemoteControllable* controllable = get_controllable_(name);
- return controllable->get_parameter_descriptions();
- }
+ std::atomic<bool> m_running;
- std::list<std::string> get_param_list_(std::string name) {
- RemoteControllable* controllable = get_controllable_(name);
- return controllable->get_supported_parameters();
- }
+ /* This is set to true if a fault occurred */
+ std::atomic<bool> m_fault;
+ boost::thread m_restarter_thread;
- std::list< std::vector<std::string> >
- get_param_list_values_(std::string name) {
- RemoteControllable* controllable = get_controllable_(name);
+ boost::thread m_child_thread;
- std::list< std::vector<std::string> > allparams;
- std::list<std::string> params = controllable->get_supported_parameters();
- for (std::list<std::string>::iterator it = params.begin();
- it != params.end(); ++it) {
- std::vector<std::string> item;
- item.push_back(*it);
- item.push_back(controllable->get_parameter(*it));
+ int m_port;
+};
- allparams.push_back(item);
+#if defined(HAVE_RC_ZEROMQ)
+/* Implements a Remote controller using zmq transportlayer
+ * that listens on localhost
+ */
+class RemoteControllerZmq : public BaseRemoteController {
+ public:
+ RemoteControllerZmq()
+ : m_running(false), m_fault(false),
+ m_zmqContext(1),
+ m_endpoint("") { }
+
+ RemoteControllerZmq(const std::string& endpoint)
+ : m_running(true), m_fault(false),
+ m_zmqContext(1),
+ m_endpoint(endpoint),
+ m_child_thread(&RemoteControllerZmq::process, this) { }
+
+ RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete;
+ RemoteControllerZmq(const RemoteControllerZmq& other) = delete;
+
+ ~RemoteControllerZmq() {
+ m_running = false;
+ m_fault = false;
+ if (!m_endpoint.empty()) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
}
- return allparams;
}
- std::string get_param_(std::string name, std::string param) {
- RemoteControllable* controllable = get_controllable_(name);
- return controllable->get_parameter(param);
- }
+ virtual bool fault_detected() { return m_fault; }
- void set_param_(std::string name, std::string param, std::string value) {
- RemoteControllable* controllable = get_controllable_(name);
- return controllable->set_parameter(param, value);
- }
+ virtual void restart();
- std::atomic<bool> m_running;
+ private:
+ void restart_thread();
+
+ void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message);
+ void send_ok_reply(zmq::socket_t &pSocket);
+ void send_fail_reply(zmq::socket_t &pSocket, const std::string &error);
+ void process();
- boost::asio::io_service m_io_service;
+ std::atomic<bool> m_running;
/* This is set to true if a fault occurred */
std::atomic<bool> m_fault;
boost::thread m_restarter_thread;
- boost::thread m_child_thread;
-
- /* This controller commands the controllables in the cohort */
- std::list<RemoteControllable*> m_cohort;
-
- int m_port;
-};
-
+ zmq::context_t m_zmqContext;
-/* The Dummy remote controller does nothing, and never fails
- */
-class RemoteControllerDummy : public BaseRemoteController {
- public:
- void enrol(RemoteControllable*) {}
-
- bool fault_detected() { return false; }
-
- virtual void restart() {}
+ std::string m_endpoint;
+ boost::thread m_child_thread;
};
-
#endif