From 0a17f5cf27e221b680617e2c522306d27bd6f141 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 1 Aug 2015 15:37:48 +0200 Subject: Refactor ZMQ RC and add list and show commands --- src/RemoteControl.cpp | 98 +++++++++++++++++++++++++++++++++++++-------------- src/RemoteControl.h | 32 +++++++++++++---- 2 files changed, 96 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 21a6c81..e291aa0 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -272,47 +272,47 @@ void RemoteControllerZmq::restart_thread() m_child_thread = boost::thread(&RemoteControllerZmq::process, this); } -void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector &message) +void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector &message) { - int more = -1; - size_t more_size = sizeof(more); - - while (more != 0) + bool more = true; + do { zmq::message_t msg; - pSocket->recv(&msg); - message.push_back(std::string((char*)msg.data(), msg.size())); - pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); - } + pSocket.recv(&msg); + std::string incoming((char*)msg.data(), msg.size()); + message.push_back(incoming); + more = msg.more(); + } while (more); } -void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket) +void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) { zmq::message_t msg(2); char repCode[2] = {'o', 'k'}; memcpy ((void*) msg.data(), repCode, 2); - pSocket->send(msg, 0); + pSocket.send(msg, 0); } -void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error) +void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) { zmq::message_t msg1(4); char repCode[4] = {'f', 'a', 'i', 'l'}; memcpy ((void*) msg1.data(), repCode, 4); - pSocket->send(msg1, ZMQ_SNDMORE); + pSocket.send(msg1, ZMQ_SNDMORE); zmq::message_t msg2(error.length()); memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket->send(msg2, 0); + pSocket.send(msg2, 0); } void RemoteControllerZmq::process() { // create zmq reply socket for receiving ctrl parameters - zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); - std::cerr << "Starting zmq remote control thread" << std::endl; + etiLog.level(info) << "Starting zmq remote control thread"; try { + zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + // connect the socket int hwm = 100; int linger = 0; @@ -327,14 +327,55 @@ void RemoteControllerZmq::process() { zmq::poll(pollItems, 1, 100); std::vector msg; + if (pollItems[0].revents & ZMQ_POLLIN) { - recv_all(&repSocket, msg); + recv_all(repSocket, msg); + std::string command((char*)msg[0].data(), msg[0].size()); if (msg.size() == 1 && command == "ping") { - send_ok_reply(&repSocket); + send_ok_reply(repSocket); + } + else if (msg.size() == 1 && command == "list") + { + size_t cohort_size = m_cohort.size(); + for (list::iterator it = m_cohort.begin(); + it != m_cohort.end(); ++it) { + std::stringstream ss; + ss << (*it)->get_rc_name(); + + std::string msg_s = ss.str(); + + zmq::message_t msg(ss.str().size()); + memcpy ((void*) msg.data(), msg_s.data(), msg_s.size()); + + int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; + repSocket.send(msg, flag); + } + } + else if (msg.size() == 2 && command == "show") + { + std::string module((char*) msg[1].data(), msg[1].size()); + try { + list< vector > r = get_param_list_values_(module); + size_t r_size = r.size(); + for (list< vector >::iterator it = r.begin(); + it != r.end(); ++it) { + + std::stringstream ss; + ss << (*it)[0] << ": " << (*it)[1] << endl; + zmq::message_t msg(ss.str().size()); + memcpy(msg.data(), ss.str().data(), ss.str().size()); + + int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; + repSocket.send(msg, flag); + } + } + catch (ParameterError &e) { + send_fail_reply(repSocket, e.what()); + } } else if (msg.size() == 3 && command == "get") { @@ -346,11 +387,11 @@ void RemoteControllerZmq::process() std::string value = get_param_(module, parameter); zmq::message_t msg(value.size()); memcpy ((void*) msg.data(), value.data(), value.size()); - repSocket.send(&msg, 0); + repSocket.send(msg, 0); } catch (ParameterError &err) { - send_fail_reply(&repSocket, err.what()); + send_fail_reply(repSocket, err.what()); } } else if (msg.size() == 4 && command == "set") @@ -362,32 +403,35 @@ void RemoteControllerZmq::process() try { set_param_(module, parameter, value); - send_ok_reply(&repSocket); + send_ok_reply(repSocket); } catch (ParameterError &err) { - send_fail_reply(&repSocket, err.what()); + send_fail_reply(repSocket, err.what()); } } - else - send_fail_reply(&repSocket, "Unsupported command"); + else { + send_fail_reply(repSocket, + "Unsupported command. commands: list, show, get, set"); + } } // check if thread is interrupted boost::this_thread::interruption_point(); } + repSocket.close(); } catch (boost::thread_interrupted&) {} catch (zmq::error_t &e) { - std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; + etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); } catch (std::exception& e) { - std::cerr << "Remote control caught exception: " << e.what() << std::endl; + etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); m_fault = true; } - repSocket.close(); } + #endif diff --git a/src/RemoteControl.h b/src/RemoteControl.h index 1b5e447..0a4848f 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -306,9 +306,9 @@ class RemoteControllerZmq : public BaseRemoteController { RemoteControllerZmq(std::string endpoint) : m_running(true), m_fault(false), - m_child_thread(&RemoteControllerZmq::process, this), m_zmqContext(1), - m_endpoint(endpoint) { } + m_endpoint(endpoint), + m_child_thread(&RemoteControllerZmq::process, this) { } ~RemoteControllerZmq() { m_running = false; @@ -330,9 +330,9 @@ class RemoteControllerZmq : public BaseRemoteController { private: void restart_thread(); - void recv_all(zmq::socket_t* pSocket, std::vector &message); - void send_ok_reply(zmq::socket_t *pSocket); - void send_fail_reply(zmq::socket_t *pSocket, const std::string &error); + void recv_all(zmq::socket_t &pSocket, std::vector &message); + void send_ok_reply(zmq::socket_t &pSocket); + void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); void process(); @@ -360,19 +360,37 @@ class RemoteControllerZmq : public BaseRemoteController { return controllable->set_parameter(param, value); } + std::list< std::vector > + get_param_list_values_(std::string name) { + RemoteControllable* controllable = get_controllable_(name); + + std::list< std::vector > allparams; + std::list params = controllable->get_supported_parameters(); + for (std::list::iterator it = params.begin(); + it != params.end(); ++it) { + std::vector item; + item.push_back(*it); + item.push_back(controllable->get_parameter(*it)); + + allparams.push_back(item); + } + return allparams; + } + + bool m_running; /* This is set to true if a fault occurred */ bool m_fault; boost::thread m_restarter_thread; - boost::thread m_child_thread; + zmq::context_t m_zmqContext; /* This controller commands the controllables in the cohort */ std::list m_cohort; - zmq::context_t m_zmqContext; std::string m_endpoint; + boost::thread m_child_thread; }; #endif -- cgit v1.2.3