summaryrefslogtreecommitdiffstats
path: root/src/ManagementServer.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-06-21 21:56:59 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-06-21 22:10:44 +0200
commit1328d62f9d3a2eb9f089d531614302005c29ec37 (patch)
tree17c179b21813f3dbea0d83535d0523dd411908f8 /src/ManagementServer.cpp
parent711f52b5a1f114ae911d0e072498c81831c0b814 (diff)
downloaddabmux-1328d62f9d3a2eb9f089d531614302005c29ec37.tar.gz
dabmux-1328d62f9d3a2eb9f089d531614302005c29ec37.tar.bz2
dabmux-1328d62f9d3a2eb9f089d531614302005c29ec37.zip
Replace MGMT socket by ZMQ, make services shared_ptr
Diffstat (limited to 'src/ManagementServer.cpp')
-rw-r--r--src/ManagementServer.cpp169
1 files changed, 68 insertions, 101 deletions
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index fc0a43e..9687278 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -197,140 +197,107 @@ void ManagementServer::restart_thread(long)
void ManagementServer::serverThread()
{
- using boost::asio::ip::tcp;
-
m_running = true;
m_fault = false;
- while (m_running) {
- m_io_service.reset();
-
- tcp::acceptor acceptor(m_io_service, tcp::endpoint(
- boost::asio::ip::address::from_string("127.0.0.1"),
- m_listenport) );
+ std::stringstream bind_addr;
+ bind_addr << "tcp://127.0.0.1:" << m_listenport;
+ m_zmq_sock.bind(bind_addr.str().c_str());
+ while (m_running) {
+ zmq::message_t zmq_message;
+ m_zmq_sock.recv(&zmq_message);
- // Add a job to start accepting connections.
- boost::shared_ptr<tcp::socket> socket(
- new tcp::socket(acceptor.get_io_service()));
-
- // Add an accept call to the service. This will prevent io_service::run()
- // from returning.
- etiLog.level(warn) << "MGMT: Waiting on connection";
- acceptor.async_accept(*socket,
- boost::bind(&ManagementServer::handle_accept,
- this,
- boost::asio::placeholders::error,
- socket,
- boost::ref(acceptor)));
- // Process event loop.
- m_io_service.run();
+ handle_message(zmq_message);
}
m_fault = true;
}
-void ManagementServer::handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor)
+bool ManagementServer::handle_setptree(
+ zmq::message_t& zmq_message, std::stringstream& answer)
{
- if (boost_error)
- {
- etiLog.level(error) << "MGMT: Error accepting connection";
- return;
- }
-
- std::stringstream welcome_msg;
-
- welcome_msg <<
- "{ \"service\": \"" <<
- PACKAGE_NAME << " " <<
-#if defined(GITVERSION)
- GITVERSION <<
-#else
- PACKAGE_VERSION <<
-#endif
- " MGMT Server\" }\n";
-
try {
- etiLog.level(info) << "RC: Accepted";
+ if (zmq_message.more()) {
+ zmq::message_t zmq_new_ptree;
+ m_zmq_sock.recv(&zmq_new_ptree);
+ std::string new_ptree(
+ (char*)zmq_new_ptree.data(), zmq_new_ptree.size()
+ );
- boost::system::error_code ignored_error;
+ etiLog.level(info) << "Received ptree " << new_ptree;
- boost::asio::write(*socket, boost::asio::buffer(welcome_msg.str()),
- boost::asio::transfer_all(),
- ignored_error);
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+ m_pt.clear();
- boost::asio::streambuf buffer;
- size_t length =
- boost::asio::read_until(*socket, buffer, "\n", ignored_error);
+ std::stringstream json_stream;
+ json_stream << new_ptree;
+ boost::property_tree::json_parser::read_json(json_stream, m_pt);
- std::string in_message;
- std::istream str(&buffer);
- std::getline(str, in_message);
+ m_retrieve_pending = true;
+ answer << "OK";
- if (in_message == "config") {
- std::string json = getStatConfigJSON();
- boost::asio::write(*socket, boost::asio::buffer(json),
- boost::asio::transfer_all(),
- ignored_error);
+ return true;
}
- else if (in_message == "values") {
- std::string json = getValuesJSON();
- boost::asio::write(*socket, boost::asio::buffer(json),
- boost::asio::transfer_all(),
- ignored_error);
+ else {
+ etiLog.level(error) <<
+ "MGMT: setptree command is missing data.";
}
- else if (in_message == "state") {
- std::string json = getStateJSON();
+ }
+ catch (std::exception& e) {
+ etiLog.level(error) <<
+ "MGMT: setptree error." << e.what();
+ }
+ return false;
+}
- boost::asio::write(*socket, boost::asio::buffer(json),
- boost::asio::transfer_all(),
- ignored_error);
- }
- else if (in_message == "setptree") {
- boost::asio::streambuf jsonbuffer;
- length = boost::asio::read_until(
- *socket, jsonbuffer, "\n", ignored_error);
+void ManagementServer::handle_message(zmq::message_t& zmq_message)
+{
+ std::stringstream answer;
+ std::string data((char*)zmq_message.data(), zmq_message.size());
- if (length > 0) {
- boost::unique_lock<boost::mutex> lock(m_configmutex);
- m_pt.clear();
+ try {
+ etiLog.level(info) << "RC: Accepted";
- std::istream json_stream(&jsonbuffer);
- boost::property_tree::json_parser::read_json(json_stream, m_pt);
- }
- else if (length == 0) {
- etiLog.level(warn) <<
- "MGMT: No JSON data received";
- }
- else {
- etiLog.level(error) <<
- "MGMT: Error JSON reception";
- }
+ if (data == "info") {
+ answer <<
+ "{ \"service\": \"" <<
+ PACKAGE_NAME << " " <<
+#if defined(GITVERSION)
+ GITVERSION <<
+#else
+ PACKAGE_VERSION <<
+#endif
+ " MGMT Server\" }\n";
+ }
+ else if (data == "config") {
+ answer << getStatConfigJSON();
+ }
+ else if (data == "values") {
+ answer << getValuesJSON();
}
- else if (in_message == "getptree") {
+ else if (data == "state") {
+ answer << getStateJSON();
+ }
+ else if (data == "setptree") {
+ handle_setptree(zmq_message, answer);
+ }
+ else if (data == "getptree") {
boost::unique_lock<boost::mutex> lock(m_configmutex);
m_pending = true;
while (m_pending && !m_retrieve_pending) {
m_condition.wait(lock);
}
- std::stringstream ss;
- boost::property_tree::json_parser::write_json(ss, m_pt);
-
- boost::asio::write(*socket, boost::asio::buffer(ss.str()),
- boost::asio::transfer_all(),
- ignored_error);
+ boost::property_tree::json_parser::write_json(answer, m_pt);
}
else {
- std::string invcmd("Invalid command\n");
- boost::asio::write(*socket, boost::asio::buffer(invcmd),
- boost::asio::transfer_all(),
- ignored_error);
+ answer << "Invalid command";
}
+
+ std::string answerstr(answer.str());
+ m_zmq_sock.send(answerstr.c_str(), answerstr.size());
}
catch (std::exception& e) {
etiLog.level(error) <<