diff options
Diffstat (limited to 'src/ManagementServer.cpp')
-rw-r--r-- | src/ManagementServer.cpp | 237 |
1 files changed, 128 insertions, 109 deletions
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 9ebdfeb..9687278 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -39,6 +39,18 @@ #include "ManagementServer.h" #include "Log.h" +ManagementServer& get_mgmt_server() +{ + static ManagementServer mgmt_server; + + return mgmt_server; + + /* Warning, do not use the mgmt_server in the destructor + * of another global object: you don't know which one + * gets destroyed first + */ +} + void ManagementServer::registerInput(InputStat* is) { boost::mutex::scoped_lock lock(m_statsmutex); @@ -47,7 +59,7 @@ void ManagementServer::registerInput(InputStat* is) if (m_inputStats.count(id) == 1) { etiLog.level(error) << - "Double registration in Stats Server with id '" << + "Double registration in MGMT Server with id '" << id << "'"; return; } @@ -71,7 +83,7 @@ bool ManagementServer::isInputRegistered(std::string& id) if (m_inputStats.count(id) == 0) { etiLog.level(error) << - "Stats Server id '" << + "Management Server: id '" << id << "' does was not registered"; return false; } @@ -185,132 +197,139 @@ void ManagementServer::restart_thread(long) void ManagementServer::serverThread() { + m_running = true; m_fault = false; - try { - int accepted_sock; - char buffer[256]; - char welcome_msg[256]; - struct sockaddr_in serv_addr, cli_addr; - int n; - - int welcome_msg_len = snprintf(welcome_msg, 256, - "{ \"service\": \"" - "%s %s Stats Server\" }\n", - PACKAGE_NAME, -#if defined(GITVERSION) - GITVERSION -#else - PACKAGE_VERSION -#endif - ); - + std::stringstream bind_addr; + bind_addr << "tcp://127.0.0.1:" << m_listenport; + m_zmq_sock.bind(bind_addr.str().c_str()); - m_sock = socket(AF_INET, SOCK_STREAM, 0); - if (m_sock < 0) { - etiLog.level(error) << "Error opening Stats Server socket: " << - strerror(errno); - m_fault = true; - return; - } + while (m_running) { + zmq::message_t zmq_message; + m_zmq_sock.recv(&zmq_message); - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 - serv_addr.sin_port = htons(m_listenport); - if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - etiLog.level(error) << "Error binding Stats Server socket: " << - strerror(errno); - goto end_serverthread; - } + handle_message(zmq_message); + } - if (listen(m_sock, 5) < 0) { - etiLog.level(error) << "Error listening on Stats Server socket: " << - strerror(errno); - goto end_serverthread; - } + m_fault = true; +} - m_running = true; +bool ManagementServer::handle_setptree( + zmq::message_t& zmq_message, std::stringstream& answer) +{ + try { + if (zmq_message.more()) { + zmq::message_t zmq_new_ptree; + m_zmq_sock.recv(&zmq_new_ptree); + std::string new_ptree( + (char*)zmq_new_ptree.data(), zmq_new_ptree.size() + ); - while (m_running) { - socklen_t cli_addr_len = sizeof(cli_addr); + etiLog.level(info) << "Received ptree " << new_ptree; - /* Accept actual connection from the client */ - accepted_sock = accept(m_sock, - (struct sockaddr *)&cli_addr, - &cli_addr_len); + boost::unique_lock<boost::mutex> lock(m_configmutex); + m_pt.clear(); - if (accepted_sock < 0) { - etiLog.level(warn) << "Stats Server cound not accept connection: " << - strerror(errno); - continue; - } - /* Send welcome message with version */ - n = write(accepted_sock, welcome_msg, welcome_msg_len); - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } + std::stringstream json_stream; + json_stream << new_ptree; + boost::property_tree::json_parser::read_json(json_stream, m_pt); - /* receive command */ - memset(buffer, 0, 256); - int n = read(accepted_sock, buffer, 255); - if (n < 0) { - etiLog.level(warn) << "Error reading from Stats Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } + m_retrieve_pending = true; + answer << "OK"; - if (strcmp(buffer, "config\n") == 0) { - std::string json = getStatConfigJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "values\n") == 0) { - std::string json = getValuesJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "state\n") == 0) { - std::string json = getStateJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - if (strcmp(buffer, "getptree\n") == 0) { - boost::unique_lock<boost::mutex> lock(m_configmutex); - m_pending = true; + return true; + } + else { + etiLog.level(error) << + "MGMT: setptree command is missing data."; + } + } + catch (std::exception& e) { + etiLog.level(error) << + "MGMT: setptree error." << e.what(); + } + return false; +} - while (m_pending) { - m_condition.wait(lock); - } - std::stringstream ss; - boost::property_tree::json_parser::write_json(ss, m_pt); +void ManagementServer::handle_message(zmq::message_t& zmq_message) +{ + std::stringstream answer; + std::string data((char*)zmq_message.data(), zmq_message.size()); - std::string response = ss.str(); + try { + etiLog.level(info) << "RC: Accepted"; - n = write(accepted_sock, response.c_str(), response.size()); - } - else { - int len = snprintf(buffer, 256, "Invalid command\n"); - n = write(accepted_sock, buffer, len); - } + if (data == "info") { + answer << + "{ \"service\": \"" << + PACKAGE_NAME << " " << +#if defined(GITVERSION) + GITVERSION << +#else + PACKAGE_VERSION << +#endif + " MGMT Server\" }\n"; + } + else if (data == "config") { + answer << getStatConfigJSON(); + } + else if (data == "values") { + answer << getValuesJSON(); + } + else if (data == "state") { + answer << getStateJSON(); + } + else if (data == "setptree") { + handle_setptree(zmq_message, answer); + } + else if (data == "getptree") { + boost::unique_lock<boost::mutex> lock(m_configmutex); + m_pending = true; - if (n < 0) { - etiLog.level(warn) << "Error writing to Stats Server socket " << - strerror(errno); + while (m_pending && !m_retrieve_pending) { + m_condition.wait(lock); } - close(accepted_sock); + boost::property_tree::json_parser::write_json(answer, m_pt); + } + else { + answer << "Invalid command"; } -end_serverthread: - m_fault = true; - close(m_sock); - + std::string answerstr(answer.str()); + m_zmq_sock.send(answerstr.c_str(), answerstr.size()); } catch (std::exception& e) { - etiLog.level(error) << "Statistics server caught exception: " << e.what(); - m_fault = true; + etiLog.level(error) << + "MGMT server caught exception: " << + e.what(); + } +} + +bool ManagementServer::retrieve_new_ptree(boost::property_tree::ptree& pt) +{ + boost::unique_lock<boost::mutex> lock(m_configmutex); + + if (m_retrieve_pending) + { + pt = m_pt; + + m_retrieve_pending = false; + m_condition.notify_one(); + return true; + } + + return false; +} + +void ManagementServer::update_ptree(const boost::property_tree::ptree& pt) +{ + if (m_running) { + boost::unique_lock<boost::mutex> lock(m_configmutex); + m_pt = pt; + m_pending = false; + + m_condition.notify_one(); } } @@ -318,12 +337,12 @@ end_serverthread: void InputStat::registerAtServer() { - mgmt_server->registerInput(this); + get_mgmt_server().registerInput(this); } InputStat::~InputStat() { - mgmt_server->unregisterInput(m_name); + get_mgmt_server().unregisterInput(m_name); } std::string InputStat::encodeValuesJSON() |