diff options
Diffstat (limited to 'src/RemoteControl.cpp')
-rw-r--r-- | src/RemoteControl.cpp | 197 |
1 files changed, 99 insertions, 98 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index c7c5914..6f538dc 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -271,120 +271,121 @@ void RemoteControllerZmq::restart_thread() void RemoteControllerZmq::recv_all(zmq::socket_t* pSocket, std::vector<std::string> &message) { - int more = -1; - size_t more_size = sizeof(more); - - while (more != 0) - { - zmq::message_t msg; - pSocket->recv(&msg); - message.push_back(std::string((char*)msg.data(), msg.size())); - pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); - } + int more = -1; + size_t more_size = sizeof(more); + + while (more != 0) + { + zmq::message_t msg; + pSocket->recv(&msg); + message.push_back(std::string((char*)msg.data(), msg.size())); + pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); + } } void RemoteControllerZmq::send_ok_reply(zmq::socket_t *pSocket) { - zmq::message_t msg(2); - char repCode[2] = {'o', 'k'}; - memcpy ((void*) msg.data(), repCode, 2); - pSocket->send(msg, 0); + zmq::message_t msg(2); + char repCode[2] = {'o', 'k'}; + memcpy ((void*) msg.data(), repCode, 2); + pSocket->send(msg, 0); } void RemoteControllerZmq::send_fail_reply(zmq::socket_t *pSocket, const std::string &error) { - zmq::message_t msg1(4); - char repCode[4] = {'f', 'a', 'i', 'l'}; - memcpy ((void*) msg1.data(), repCode, 4); - pSocket->send(msg1, ZMQ_SNDMORE); - - zmq::message_t msg2(error.length()); - memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket->send(msg2, 0); + zmq::message_t msg1(4); + char repCode[4] = {'f', 'a', 'i', 'l'}; + memcpy ((void*) msg1.data(), repCode, 4); + pSocket->send(msg1, ZMQ_SNDMORE); + + zmq::message_t msg2(error.length()); + memcpy ((void*) msg2.data(), error.c_str(), error.length()); + pSocket->send(msg2, 0); } void RemoteControllerZmq::process() { - // create zmq reply socket for receiving ctrl parameters - zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); - std::cout << "Starting zmq remote control thread" << std::endl; - try - { - // connect the socket - int hwm = 100; - int linger = 0; - repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); - repSocket.bind(m_endpoint.c_str()); - - // create pollitem that polls the ZMQ sockets - zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; - for(;;) - { - zmq::poll(pollItems, 1, 100); - std::vector<std::string> msg; - if (pollItems[0].revents & ZMQ_POLLIN) - { - recv_all(&repSocket, msg); - std::string command((char*)msg[0].data(), msg[0].size()); - - if (msg.size() == 1 && command == "ping") - { - send_ok_reply(&repSocket); - } - else if (msg.size() == 3 && command == "get") - { - std::string module((char*) msg[1].data(), msg[1].size()); - std::string parameter((char*) msg[2].data(), msg[2].size()); - - try - { - std::string value = get_param_(module, parameter); - zmq::message_t *pMsg = new zmq::message_t(value.size()); - memcpy ((void*) pMsg->data(), value.data(), value.size()); - repSocket.send(*pMsg, 0); - delete pMsg; - } - catch (ParameterError &err) - { - send_fail_reply(&repSocket, err.what()); - } - } - else if (msg.size() == 4 && command == "set") - { - std::string module((char*) msg[1].data(), msg[1].size()); - std::string parameter((char*) msg[2].data(), msg[2].size()); - std::string value((char*) msg[3].data(), msg[3].size()); - - try - { - set_param_(module, parameter, value); - send_ok_reply(&repSocket); - } - catch (ParameterError &err) - { - send_fail_reply(&repSocket, err.what()); - } - } - else - send_fail_reply(&repSocket, "Unsupported command"); - } - - // check if thread is interrupted - boost::this_thread::interruption_point(); - } - } - catch (boost::thread_interrupted&) {} - catch (zmq::error_t &e) - { - std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; - } + // create zmq reply socket for receiving ctrl parameters + zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); + std::cout << "Starting zmq remote control thread" << std::endl; + try + { + // connect the socket + int hwm = 100; + int linger = 0; + repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + repSocket.bind(m_endpoint.c_str()); + + // create pollitem that polls the ZMQ sockets + zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; + for(;;) + { + zmq::poll(pollItems, 1, 100); + std::vector<std::string> msg; + if (pollItems[0].revents & ZMQ_POLLIN) + { + recv_all(&repSocket, msg); + std::string command((char*)msg[0].data(), msg[0].size()); + + if (msg.size() == 1 && command == "ping") + { + send_ok_reply(&repSocket); + } + else if (msg.size() == 3 && command == "get") + { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + + try + { + std::string value = get_param_(module, parameter); + zmq::message_t *pMsg = new zmq::message_t(value.size()); + memcpy ((void*) pMsg->data(), value.data(), value.size()); + repSocket.send(*pMsg, 0); + delete pMsg; + } + catch (ParameterError &err) + { + send_fail_reply(&repSocket, err.what()); + } + } + else if (msg.size() == 4 && command == "set") + { + std::string module((char*) msg[1].data(), msg[1].size()); + std::string parameter((char*) msg[2].data(), msg[2].size()); + std::string value((char*) msg[3].data(), msg[3].size()); + + try + { + set_param_(module, parameter, value); + send_ok_reply(&repSocket); + } + catch (ParameterError &err) + { + send_fail_reply(&repSocket, err.what()); + } + } + else + send_fail_reply(&repSocket, "Unsupported command"); + } + + // check if thread is interrupted + boost::this_thread::interruption_point(); + } + } + catch (boost::thread_interrupted&) {} + catch (zmq::error_t &e) + { + std::cerr << "ZMQ error: " << std::string(e.what()) << std::endl; + } catch (std::exception& e) { std::cerr << "Remote control caught exception: " << e.what() << std::endl; m_fault = true; } - repSocket.close(); + repSocket.close(); } #endif + |