aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-08-01 15:37:48 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-08-01 15:37:48 +0200
commit0a17f5cf27e221b680617e2c522306d27bd6f141 (patch)
tree58cc2be79f223cb9bbe3de1afe460eb1af7e58e6
parent98b07e76c56bced3f6466d87941ed3409ea22d5f (diff)
downloaddabmod-0a17f5cf27e221b680617e2c522306d27bd6f141.tar.gz
dabmod-0a17f5cf27e221b680617e2c522306d27bd6f141.tar.bz2
dabmod-0a17f5cf27e221b680617e2c522306d27bd6f141.zip
Refactor ZMQ RC and add list and show commands
-rw-r--r--src/RemoteControl.cpp98
-rw-r--r--src/RemoteControl.h32
2 files changed, 96 insertions, 34 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index 21a6c81..e291aa0 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -272,47 +272,47 @@ void RemoteControllerZmq::restart_thread()
m_child_thread = boost::thread(&RemoteControllerZmq::process, this);
}
-void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message)
+void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message)
{
- int more = -1;
- size_t more_size = sizeof(more);
-
- while (more != 0)
+ bool more = true;
+ do
{
zmq::message_t msg;
- pSocket->recv(&msg);
- message.push_back(std::string((char*)msg.data(), msg.size()));
- pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size);
- }
+ pSocket.recv(&msg);
+ std::string incoming((char*)msg.data(), msg.size());
+ message.push_back(incoming);
+ more = msg.more();
+ } while (more);
}
-void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket)
+void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket)
{
zmq::message_t msg(2);
char repCode[2] = {'o', 'k'};
memcpy ((void*) msg.data(), repCode, 2);
- pSocket->send(msg, 0);
+ pSocket.send(msg, 0);
}
-void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error)
+void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error)
{
zmq::message_t msg1(4);
char repCode[4] = {'f', 'a', 'i', 'l'};
memcpy ((void*) msg1.data(), repCode, 4);
- pSocket->send(msg1, ZMQ_SNDMORE);
+ pSocket.send(msg1, ZMQ_SNDMORE);
zmq::message_t msg2(error.length());
memcpy ((void*) msg2.data(), error.c_str(), error.length());
- pSocket->send(msg2, 0);
+ pSocket.send(msg2, 0);
}
void RemoteControllerZmq::process()
{
// create zmq reply socket for receiving ctrl parameters
- zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
- std::cerr << "Starting zmq remote control thread" << std::endl;
+ etiLog.level(info) << "Starting zmq remote control thread";
try
{
+ zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
+
// connect the socket
int hwm = 100;
int linger = 0;
@@ -327,14 +327,55 @@ void RemoteControllerZmq::process()
{
zmq::poll(pollItems, 1, 100);
std::vector<std::string> msg;
+
if (pollItems[0].revents & ZMQ_POLLIN)
{
- recv_all(&repSocket, msg);
+ recv_all(repSocket, msg);
+
std::string command((char*)msg[0].data(), msg[0].size());
if (msg.size() == 1 && command == "ping")
{
- send_ok_reply(&repSocket);
+ send_ok_reply(repSocket);
+ }
+ else if (msg.size() == 1 && command == "list")
+ {
+ size_t cohort_size = m_cohort.size();
+ for (list<RemoteControllable*>::iterator it = m_cohort.begin();
+ it != m_cohort.end(); ++it) {
+ std::stringstream ss;
+ ss << (*it)->get_rc_name();
+
+ std::string msg_s = ss.str();
+
+ zmq::message_t msg(ss.str().size());
+ memcpy ((void*) msg.data(), msg_s.data(), msg_s.size());
+
+ int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0;
+ repSocket.send(msg, flag);
+ }
+ }
+ else if (msg.size() == 2 && command == "show")
+ {
+ std::string module((char*) msg[1].data(), msg[1].size());
+ try {
+ list< vector<string> > r = get_param_list_values_(module);
+ size_t r_size = r.size();
+ for (list< vector<string> >::iterator it = r.begin();
+ it != r.end(); ++it) {
+
+ std::stringstream ss;
+ ss << (*it)[0] << ": " << (*it)[1] << endl;
+ zmq::message_t msg(ss.str().size());
+ memcpy(msg.data(), ss.str().data(), ss.str().size());
+
+ int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0;
+ repSocket.send(msg, flag);
+ }
+ }
+ catch (ParameterError &e) {
+ send_fail_reply(repSocket, e.what());
+ }
}
else if (msg.size() == 3 && command == "get")
{
@@ -346,11 +387,11 @@ void RemoteControllerZmq::process()
std::string value = get_param_(module, parameter);
zmq::message_t msg(value.size());
memcpy ((void*) msg.data(), value.data(), value.size());
- repSocket.send(&msg, 0);
+ repSocket.send(msg, 0);
}
catch (ParameterError &err)
{
- send_fail_reply(&repSocket, err.what());
+ send_fail_reply(repSocket, err.what());
}
}
else if (msg.size() == 4 && command == "set")
@@ -362,32 +403,35 @@ void RemoteControllerZmq::process()
try
{
set_param_(module, parameter, value);
- send_ok_reply(&repSocket);
+ send_ok_reply(repSocket);
}
catch (ParameterError &err)
{
- send_fail_reply(&repSocket, err.what());
+ send_fail_reply(repSocket, err.what());
}
}
- else
- send_fail_reply(&repSocket, "Unsupported command");
+ else {
+ send_fail_reply(repSocket,
+ "Unsupported command. commands: list, show, get, set");
+ }
}
// check if thread is interrupted
boost::this_thread::interruption_point();
}
+ repSocket.close();
}
catch (boost::thread_interrupted&) {}
catch (zmq::error_t &e)
{
- std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl;
+ etiLog.level(error) << "ZMQ RC error: " << std::string(e.what());
}
catch (std::exception& e)
{
- std::cerr << "Remote control caught exception: " << e.what() << std::endl;
+ etiLog.level(error) << "ZMQ RC caught exception: " << e.what();
m_fault = true;
}
- repSocket.close();
}
+
#endif
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index 1b5e447..0a4848f 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -306,9 +306,9 @@ class RemoteControllerZmq : public BaseRemoteController {
RemoteControllerZmq(std::string endpoint)
: m_running(true), m_fault(false),
- m_child_thread(&RemoteControllerZmq::process, this),
m_zmqContext(1),
- m_endpoint(endpoint) { }
+ m_endpoint(endpoint),
+ m_child_thread(&RemoteControllerZmq::process, this) { }
~RemoteControllerZmq() {
m_running = false;
@@ -330,9 +330,9 @@ class RemoteControllerZmq : public BaseRemoteController {
private:
void restart_thread();
- void recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message);
- void send_ok_reply(zmq::socket_t *pSocket);
- void send_fail_reply(zmq::socket_t *pSocket, const std::string &error);
+ void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message);
+ void send_ok_reply(zmq::socket_t &pSocket);
+ void send_fail_reply(zmq::socket_t &pSocket, const std::string &error);
void process();
@@ -360,19 +360,37 @@ class RemoteControllerZmq : public BaseRemoteController {
return controllable->set_parameter(param, value);
}
+ std::list< std::vector<std::string> >
+ get_param_list_values_(std::string name) {
+ RemoteControllable* controllable = get_controllable_(name);
+
+ std::list< std::vector<std::string> > allparams;
+ std::list<std::string> params = controllable->get_supported_parameters();
+ for (std::list<std::string>::iterator it = params.begin();
+ it != params.end(); ++it) {
+ std::vector<std::string> item;
+ item.push_back(*it);
+ item.push_back(controllable->get_parameter(*it));
+
+ allparams.push_back(item);
+ }
+ return allparams;
+ }
+
+
bool m_running;
/* This is set to true if a fault occurred */
bool m_fault;
boost::thread m_restarter_thread;
- boost::thread m_child_thread;
+ zmq::context_t m_zmqContext;
/* This controller commands the controllables in the cohort */
std::list<RemoteControllable*> m_cohort;
- zmq::context_t m_zmqContext;
std::string m_endpoint;
+ boost::thread m_child_thread;
};
#endif