summaryrefslogtreecommitdiffstats
path: root/src/ManagementServer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ManagementServer.cpp')
-rw-r--r--src/ManagementServer.cpp237
1 files changed, 128 insertions, 109 deletions
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index 9ebdfeb..9687278 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -39,6 +39,18 @@
#include "ManagementServer.h"
#include "Log.h"
+ManagementServer& get_mgmt_server()
+{
+ static ManagementServer mgmt_server;
+
+ return mgmt_server;
+
+ /* Warning, do not use the mgmt_server in the destructor
+ * of another global object: you don't know which one
+ * gets destroyed first
+ */
+}
+
void ManagementServer::registerInput(InputStat* is)
{
boost::mutex::scoped_lock lock(m_statsmutex);
@@ -47,7 +59,7 @@ void ManagementServer::registerInput(InputStat* is)
if (m_inputStats.count(id) == 1) {
etiLog.level(error) <<
- "Double registration in Stats Server with id '" <<
+ "Double registration in MGMT Server with id '" <<
id << "'";
return;
}
@@ -71,7 +83,7 @@ bool ManagementServer::isInputRegistered(std::string& id)
if (m_inputStats.count(id) == 0) {
etiLog.level(error) <<
- "Stats Server id '" <<
+ "Management Server: id '" <<
id << "' does was not registered";
return false;
}
@@ -185,132 +197,139 @@ void ManagementServer::restart_thread(long)
void ManagementServer::serverThread()
{
+ m_running = true;
m_fault = false;
- try {
- int accepted_sock;
- char buffer[256];
- char welcome_msg[256];
- struct sockaddr_in serv_addr, cli_addr;
- int n;
-
- int welcome_msg_len = snprintf(welcome_msg, 256,
- "{ \"service\": \""
- "%s %s Stats Server\" }\n",
- PACKAGE_NAME,
-#if defined(GITVERSION)
- GITVERSION
-#else
- PACKAGE_VERSION
-#endif
- );
-
+ std::stringstream bind_addr;
+ bind_addr << "tcp://127.0.0.1:" << m_listenport;
+ m_zmq_sock.bind(bind_addr.str().c_str());
- m_sock = socket(AF_INET, SOCK_STREAM, 0);
- if (m_sock < 0) {
- etiLog.level(error) << "Error opening Stats Server socket: " <<
- strerror(errno);
- m_fault = true;
- return;
- }
+ while (m_running) {
+ zmq::message_t zmq_message;
+ m_zmq_sock.recv(&zmq_message);
- memset(&serv_addr, 0, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
- serv_addr.sin_port = htons(m_listenport);
- if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- etiLog.level(error) << "Error binding Stats Server socket: " <<
- strerror(errno);
- goto end_serverthread;
- }
+ handle_message(zmq_message);
+ }
- if (listen(m_sock, 5) < 0) {
- etiLog.level(error) << "Error listening on Stats Server socket: " <<
- strerror(errno);
- goto end_serverthread;
- }
+ m_fault = true;
+}
- m_running = true;
+bool ManagementServer::handle_setptree(
+ zmq::message_t& zmq_message, std::stringstream& answer)
+{
+ try {
+ if (zmq_message.more()) {
+ zmq::message_t zmq_new_ptree;
+ m_zmq_sock.recv(&zmq_new_ptree);
+ std::string new_ptree(
+ (char*)zmq_new_ptree.data(), zmq_new_ptree.size()
+ );
- while (m_running) {
- socklen_t cli_addr_len = sizeof(cli_addr);
+ etiLog.level(info) << "Received ptree " << new_ptree;
- /* Accept actual connection from the client */
- accepted_sock = accept(m_sock,
- (struct sockaddr *)&cli_addr,
- &cli_addr_len);
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+ m_pt.clear();
- if (accepted_sock < 0) {
- etiLog.level(warn) << "Stats Server cound not accept connection: " <<
- strerror(errno);
- continue;
- }
- /* Send welcome message with version */
- n = write(accepted_sock, welcome_msg, welcome_msg_len);
- if (n < 0) {
- etiLog.level(warn) << "Error writing to Stats Server socket " <<
- strerror(errno);
- close(accepted_sock);
- continue;
- }
+ std::stringstream json_stream;
+ json_stream << new_ptree;
+ boost::property_tree::json_parser::read_json(json_stream, m_pt);
- /* receive command */
- memset(buffer, 0, 256);
- int n = read(accepted_sock, buffer, 255);
- if (n < 0) {
- etiLog.level(warn) << "Error reading from Stats Server socket " <<
- strerror(errno);
- close(accepted_sock);
- continue;
- }
+ m_retrieve_pending = true;
+ answer << "OK";
- if (strcmp(buffer, "config\n") == 0) {
- std::string json = getStatConfigJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else if (strcmp(buffer, "values\n") == 0) {
- std::string json = getValuesJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else if (strcmp(buffer, "state\n") == 0) {
- std::string json = getStateJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- if (strcmp(buffer, "getptree\n") == 0) {
- boost::unique_lock<boost::mutex> lock(m_configmutex);
- m_pending = true;
+ return true;
+ }
+ else {
+ etiLog.level(error) <<
+ "MGMT: setptree command is missing data.";
+ }
+ }
+ catch (std::exception& e) {
+ etiLog.level(error) <<
+ "MGMT: setptree error." << e.what();
+ }
+ return false;
+}
- while (m_pending) {
- m_condition.wait(lock);
- }
- std::stringstream ss;
- boost::property_tree::json_parser::write_json(ss, m_pt);
+void ManagementServer::handle_message(zmq::message_t& zmq_message)
+{
+ std::stringstream answer;
+ std::string data((char*)zmq_message.data(), zmq_message.size());
- std::string response = ss.str();
+ try {
+ etiLog.level(info) << "RC: Accepted";
- n = write(accepted_sock, response.c_str(), response.size());
- }
- else {
- int len = snprintf(buffer, 256, "Invalid command\n");
- n = write(accepted_sock, buffer, len);
- }
+ if (data == "info") {
+ answer <<
+ "{ \"service\": \"" <<
+ PACKAGE_NAME << " " <<
+#if defined(GITVERSION)
+ GITVERSION <<
+#else
+ PACKAGE_VERSION <<
+#endif
+ " MGMT Server\" }\n";
+ }
+ else if (data == "config") {
+ answer << getStatConfigJSON();
+ }
+ else if (data == "values") {
+ answer << getValuesJSON();
+ }
+ else if (data == "state") {
+ answer << getStateJSON();
+ }
+ else if (data == "setptree") {
+ handle_setptree(zmq_message, answer);
+ }
+ else if (data == "getptree") {
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+ m_pending = true;
- if (n < 0) {
- etiLog.level(warn) << "Error writing to Stats Server socket " <<
- strerror(errno);
+ while (m_pending && !m_retrieve_pending) {
+ m_condition.wait(lock);
}
- close(accepted_sock);
+ boost::property_tree::json_parser::write_json(answer, m_pt);
+ }
+ else {
+ answer << "Invalid command";
}
-end_serverthread:
- m_fault = true;
- close(m_sock);
-
+ std::string answerstr(answer.str());
+ m_zmq_sock.send(answerstr.c_str(), answerstr.size());
}
catch (std::exception& e) {
- etiLog.level(error) << "Statistics server caught exception: " << e.what();
- m_fault = true;
+ etiLog.level(error) <<
+ "MGMT server caught exception: " <<
+ e.what();
+ }
+}
+
+bool ManagementServer::retrieve_new_ptree(boost::property_tree::ptree& pt)
+{
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+
+ if (m_retrieve_pending)
+ {
+ pt = m_pt;
+
+ m_retrieve_pending = false;
+ m_condition.notify_one();
+ return true;
+ }
+
+ return false;
+}
+
+void ManagementServer::update_ptree(const boost::property_tree::ptree& pt)
+{
+ if (m_running) {
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+ m_pt = pt;
+ m_pending = false;
+
+ m_condition.notify_one();
}
}
@@ -318,12 +337,12 @@ end_serverthread:
void InputStat::registerAtServer()
{
- mgmt_server->registerInput(this);
+ get_mgmt_server().registerInput(this);
}
InputStat::~InputStat()
{
- mgmt_server->unregisterInput(m_name);
+ get_mgmt_server().unregisterInput(m_name);
}
std::string InputStat::encodeValuesJSON()