diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-08-01 15:37:48 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-08-01 15:37:48 +0200 |
commit | 0a17f5cf27e221b680617e2c522306d27bd6f141 (patch) | |
tree | 58cc2be79f223cb9bbe3de1afe460eb1af7e58e6 /src/RemoteControl.cpp | |
parent | 98b07e76c56bced3f6466d87941ed3409ea22d5f (diff) | |
download | dabmod-0a17f5cf27e221b680617e2c522306d27bd6f141.tar.gz dabmod-0a17f5cf27e221b680617e2c522306d27bd6f141.tar.bz2 dabmod-0a17f5cf27e221b680617e2c522306d27bd6f141.zip |
Refactor ZMQ RC and add list and show commands
Diffstat (limited to 'src/RemoteControl.cpp')
-rw-r--r-- | src/RemoteControl.cpp | 98 |
1 files changed, 71 insertions, 27 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 21a6c81..e291aa0 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -272,47 +272,47 @@ void RemoteControllerZmq::restart_thread() m_child_thread = boost::thread(&RemoteControllerZmq::process, this); } -void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message) +void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message) { - int more = -1; - size_t more_size = sizeof(more); - - while (more != 0) + bool more = true; + do { zmq::message_t msg; - pSocket->recv(&msg); - message.push_back(std::string((char*)msg.data(), msg.size())); - pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); - } + pSocket.recv(&msg); + std::string incoming((char*)msg.data(), msg.size()); + message.push_back(incoming); + more = msg.more(); + } while (more); } -void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket) +void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) { zmq::message_t msg(2); char repCode[2] = {'o', 'k'}; memcpy ((void*) msg.data(), repCode, 2); - pSocket->send(msg, 0); + pSocket.send(msg, 0); } -void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error) +void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) { zmq::message_t msg1(4); char repCode[4] = {'f', 'a', 'i', 'l'}; memcpy ((void*) msg1.data(), repCode, 4); - pSocket->send(msg1, ZMQ_SNDMORE); + pSocket.send(msg1, ZMQ_SNDMORE); zmq::message_t msg2(error.length()); memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket->send(msg2, 0); + pSocket.send(msg2, 0); } void RemoteControllerZmq::process() { // create zmq reply socket for receiving ctrl parameters - zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); - std::cerr << "Starting zmq remote control thread" << std::endl; + etiLog.level(info) << "Starting zmq remote control thread"; try { + zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + // connect the socket int hwm = 100; int linger = 0; @@ -327,14 +327,55 @@ void RemoteControllerZmq::process() { zmq::poll(pollItems, 1, 100); std::vector<std::string> msg; + if (pollItems[0].revents & ZMQ_POLLIN) { - recv_all(&repSocket, msg); + recv_all(repSocket, msg); + std::string command((char*)msg[0].data(), msg[0].size()); if (msg.size() == 1 && command == "ping") { - send_ok_reply(&repSocket); + send_ok_reply(repSocket); + } + else if (msg.size() == 1 && command == "list") + { + size_t cohort_size = m_cohort.size(); + for (list<RemoteControllable*>::iterator it = m_cohort.begin(); + it != m_cohort.end(); ++it) { + std::stringstream ss; + ss << (*it)->get_rc_name(); + + std::string msg_s = ss.str(); + + zmq::message_t msg(ss.str().size()); + memcpy ((void*) msg.data(), msg_s.data(), msg_s.size()); + + int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; + repSocket.send(msg, flag); + } + } + else if (msg.size() == 2 && command == "show") + { + std::string module((char*) msg[1].data(), msg[1].size()); + try { + list< vector<string> > r = get_param_list_values_(module); + size_t r_size = r.size(); + for (list< vector<string> >::iterator it = r.begin(); + it != r.end(); ++it) { + + std::stringstream ss; + ss << (*it)[0] << ": " << (*it)[1] << endl; + zmq::message_t msg(ss.str().size()); + memcpy(msg.data(), ss.str().data(), ss.str().size()); + + int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; + repSocket.send(msg, flag); + } + } + catch (ParameterError &e) { + send_fail_reply(repSocket, e.what()); + } } else if (msg.size() == 3 && command == "get") { @@ -346,11 +387,11 @@ void RemoteControllerZmq::process() std::string value = get_param_(module, parameter); zmq::message_t msg(value.size()); memcpy ((void*) msg.data(), value.data(), value.size()); - repSocket.send(&msg, 0); + repSocket.send(msg, 0); } catch (ParameterError &err) { - send_fail_reply(&repSocket, err.what()); + send_fail_reply(repSocket, err.what()); } } else if (msg.size() == 4 && command == "set") @@ -362,32 +403,35 @@ void RemoteControllerZmq::process() try { set_param_(module, parameter, value); - send_ok_reply(&repSocket); + send_ok_reply(repSocket); } catch (ParameterError &err) { - send_fail_reply(&repSocket, err.what()); + send_fail_reply(repSocket, err.what()); } } - else - send_fail_reply(&repSocket, "Unsupported command"); + else { + send_fail_reply(repSocket, + "Unsupported command. commands: list, show, get, set"); + } } // check if thread is interrupted boost::this_thread::interruption_point(); } + repSocket.close(); } catch (boost::thread_interrupted&) {} catch (zmq::error_t &e) { - std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; + etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); } catch (std::exception& e) { - std::cerr << "Remote control caught exception: " << e.what() << std::endl; + etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); m_fault = true; } - repSocket.close(); } + #endif |