diff options
Diffstat (limited to 'src/ManagementServer.cpp')
-rw-r--r-- | src/ManagementServer.cpp | 169 |
1 files changed, 68 insertions, 101 deletions
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index fc0a43e..9687278 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -197,140 +197,107 @@ void ManagementServer::restart_thread(long) void ManagementServer::serverThread() { - using boost::asio::ip::tcp; - m_running = true; m_fault = false; - while (m_running) { - m_io_service.reset(); - - tcp::acceptor acceptor(m_io_service, tcp::endpoint( - boost::asio::ip::address::from_string("127.0.0.1"), - m_listenport) ); + std::stringstream bind_addr; + bind_addr << "tcp://127.0.0.1:" << m_listenport; + m_zmq_sock.bind(bind_addr.str().c_str()); + while (m_running) { + zmq::message_t zmq_message; + m_zmq_sock.recv(&zmq_message); - // Add a job to start accepting connections. - boost::shared_ptr<tcp::socket> socket( - new tcp::socket(acceptor.get_io_service())); - - // Add an accept call to the service. This will prevent io_service::run() - // from returning. - etiLog.level(warn) << "MGMT: Waiting on connection"; - acceptor.async_accept(*socket, - boost::bind(&ManagementServer::handle_accept, - this, - boost::asio::placeholders::error, - socket, - boost::ref(acceptor))); - // Process event loop. - m_io_service.run(); + handle_message(zmq_message); } m_fault = true; } -void ManagementServer::handle_accept( - const boost::system::error_code& boost_error, - boost::shared_ptr< boost::asio::ip::tcp::socket > socket, - boost::asio::ip::tcp::acceptor& acceptor) +bool ManagementServer::handle_setptree( + zmq::message_t& zmq_message, std::stringstream& answer) { - if (boost_error) - { - etiLog.level(error) << "MGMT: Error accepting connection"; - return; - } - - std::stringstream welcome_msg; - - welcome_msg << - "{ \"service\": \"" << - PACKAGE_NAME << " " << -#if defined(GITVERSION) - GITVERSION << -#else - PACKAGE_VERSION << -#endif - " MGMT Server\" }\n"; - try { - etiLog.level(info) << "RC: Accepted"; + if (zmq_message.more()) { + zmq::message_t zmq_new_ptree; + m_zmq_sock.recv(&zmq_new_ptree); + std::string new_ptree( + (char*)zmq_new_ptree.data(), zmq_new_ptree.size() + ); - boost::system::error_code ignored_error; + etiLog.level(info) << "Received ptree " << new_ptree; - boost::asio::write(*socket, boost::asio::buffer(welcome_msg.str()), - boost::asio::transfer_all(), - ignored_error); + boost::unique_lock<boost::mutex> lock(m_configmutex); + m_pt.clear(); - boost::asio::streambuf buffer; - size_t length = - boost::asio::read_until(*socket, buffer, "\n", ignored_error); + std::stringstream json_stream; + json_stream << new_ptree; + boost::property_tree::json_parser::read_json(json_stream, m_pt); - std::string in_message; - std::istream str(&buffer); - std::getline(str, in_message); + m_retrieve_pending = true; + answer << "OK"; - if (in_message == "config") { - std::string json = getStatConfigJSON(); - boost::asio::write(*socket, boost::asio::buffer(json), - boost::asio::transfer_all(), - ignored_error); + return true; } - else if (in_message == "values") { - std::string json = getValuesJSON(); - boost::asio::write(*socket, boost::asio::buffer(json), - boost::asio::transfer_all(), - ignored_error); + else { + etiLog.level(error) << + "MGMT: setptree command is missing data."; } - else if (in_message == "state") { - std::string json = getStateJSON(); + } + catch (std::exception& e) { + etiLog.level(error) << + "MGMT: setptree error." << e.what(); + } + return false; +} - boost::asio::write(*socket, boost::asio::buffer(json), - boost::asio::transfer_all(), - ignored_error); - } - else if (in_message == "setptree") { - boost::asio::streambuf jsonbuffer; - length = boost::asio::read_until( - *socket, jsonbuffer, "\n", ignored_error); +void ManagementServer::handle_message(zmq::message_t& zmq_message) +{ + std::stringstream answer; + std::string data((char*)zmq_message.data(), zmq_message.size()); - if (length > 0) { - boost::unique_lock<boost::mutex> lock(m_configmutex); - m_pt.clear(); + try { + etiLog.level(info) << "RC: Accepted"; - std::istream json_stream(&jsonbuffer); - boost::property_tree::json_parser::read_json(json_stream, m_pt); - } - else if (length == 0) { - etiLog.level(warn) << - "MGMT: No JSON data received"; - } - else { - etiLog.level(error) << - "MGMT: Error JSON reception"; - } + if (data == "info") { + answer << + "{ \"service\": \"" << + PACKAGE_NAME << " " << +#if defined(GITVERSION) + GITVERSION << +#else + PACKAGE_VERSION << +#endif + " MGMT Server\" }\n"; + } + else if (data == "config") { + answer << getStatConfigJSON(); + } + else if (data == "values") { + answer << getValuesJSON(); } - else if (in_message == "getptree") { + else if (data == "state") { + answer << getStateJSON(); + } + else if (data == "setptree") { + handle_setptree(zmq_message, answer); + } + else if (data == "getptree") { boost::unique_lock<boost::mutex> lock(m_configmutex); m_pending = true; while (m_pending && !m_retrieve_pending) { m_condition.wait(lock); } - std::stringstream ss; - boost::property_tree::json_parser::write_json(ss, m_pt); - - boost::asio::write(*socket, boost::asio::buffer(ss.str()), - boost::asio::transfer_all(), - ignored_error); + boost::property_tree::json_parser::write_json(answer, m_pt); } else { - std::string invcmd("Invalid command\n"); - boost::asio::write(*socket, boost::asio::buffer(invcmd), - boost::asio::transfer_all(), - ignored_error); + answer << "Invalid command"; } + + std::string answerstr(answer.str()); + m_zmq_sock.send(answerstr.c_str(), answerstr.size()); } catch (std::exception& e) { etiLog.level(error) << |