From 039f82969aff73749ff50e9d65950a85509fde0c Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 19 Feb 2018 11:52:39 +0100 Subject: ManagementServer: Use zmq poller instead of nonblock and usleep --- src/ManagementServer.cpp | 57 +++++++++++++++++++++++++++++++++++++----------- src/ManagementServer.h | 25 ++++----------------- 2 files changed, 48 insertions(+), 34 deletions(-) diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index d42a205..917d3c7 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -217,6 +217,30 @@ std::string ManagementServer::getValuesJSON() return ss.str(); } +ManagementServer::ManagementServer() : + m_zmq_context(), + m_zmq_sock(m_zmq_context, ZMQ_REP), + m_running(false), + m_fault(false) +{ } + +ManagementServer::~ManagementServer() +{ + m_running = false; + if (m_thread.joinable()) { + m_thread.join(); + m_fault = false; + } +} + +void ManagementServer::open(int listenport) +{ + m_listenport = listenport; + if (m_listenport > 0) { + m_thread = std::thread(&ManagementServer::serverThread, this); + } +} + void ManagementServer::restart() { m_restarter_thread = thread(&ManagementServer::restart_thread, this, 0); @@ -229,8 +253,9 @@ void ManagementServer::restart_thread(long) { m_running = false; - if (m_listenport) { + if (m_thread.joinable()) { m_thread.join(); + m_fault = false; } m_thread = thread(&ManagementServer::serverThread, this); @@ -241,20 +266,26 @@ void ManagementServer::serverThread() m_running = true; m_fault = false; - std::stringstream bind_addr; - bind_addr << "tcp://127.0.0.1:" << m_listenport; - m_zmq_sock.bind(bind_addr.str().c_str()); + try { + std::string bind_addr = "tcp://127.0.0.1:" + to_string(m_listenport); + m_zmq_sock.bind(bind_addr.c_str()); - while (m_running) { - zmq::message_t zmq_message; - if (m_zmq_sock.recv(&zmq_message, ZMQ_DONTWAIT)) { - handle_message(zmq_message); - } - else { - usleep(10000); + zmq::pollitem_t pollItems[] = { {m_zmq_sock, 0, ZMQ_POLLIN, 0} }; + + while (m_running) { + zmq::poll(pollItems, 1, 1000); + + if (pollItems[0].revents & ZMQ_POLLIN) { + zmq::message_t zmq_message; + m_zmq_sock.recv(&zmq_message); + handle_message(zmq_message); + } } } - + catch (const exception &e) { + etiLog.level(error) << "Exception in ManagementServer: " << + e.what(); + } m_fault = true; } @@ -294,7 +325,7 @@ void ManagementServer::handle_message(zmq::message_t& zmq_message) std::string answerstr(answer.str()); m_zmq_sock.send(answerstr.c_str(), answerstr.size()); } - catch (std::exception& e) { + catch (const std::exception& e) { etiLog.level(error) << "MGMT server caught exception: " << e.what(); diff --git a/src/ManagementServer.h b/src/ManagementServer.h index f8d722e..c0b50e7 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -135,29 +135,12 @@ class InputStat class ManagementServer { public: - ManagementServer() : - m_zmq_context(), - m_zmq_sock(m_zmq_context, ZMQ_REP), - m_running(false), - m_fault(false) { } - - ~ManagementServer() - { - m_running = false; - m_fault = false; - m_thread.join(); - } - + ManagementServer(); ManagementServer(const ManagementServer& other) = delete; ManagementServer& operator=(const ManagementServer& other) = delete; + ~ManagementServer(); - void open(int listenport) - { - m_listenport = listenport; - if (m_listenport > 0) { - m_thread = std::thread(&ManagementServer::serverThread, this); - } - } + void open(int listenport); /* Un-/Register a statistics data source */ void registerInput(InputStat* is); @@ -174,7 +157,7 @@ class ManagementServer */ void update_ptree(const boost::property_tree::ptree& pt); - bool fault_detected() { return m_fault; } + bool fault_detected() const { return m_fault; } void restart(void); private: -- cgit v1.2.3