summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-02-19 11:52:39 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-02-19 11:53:39 +0100
commit039f82969aff73749ff50e9d65950a85509fde0c (patch)
tree25709b273d3af7b47201c40514ab3ef54dd29477
parenta48f80b33a9859da52dfbcebf0bc69e80c1bb8e5 (diff)
downloaddabmux-039f82969aff73749ff50e9d65950a85509fde0c.tar.gz
dabmux-039f82969aff73749ff50e9d65950a85509fde0c.tar.bz2
dabmux-039f82969aff73749ff50e9d65950a85509fde0c.zip
ManagementServer: Use zmq poller instead of nonblock and usleep
-rw-r--r--src/ManagementServer.cpp57
-rw-r--r--src/ManagementServer.h25
2 files changed, 48 insertions, 34 deletions
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index d42a205..917d3c7 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -217,6 +217,30 @@ std::string ManagementServer::getValuesJSON()
return ss.str();
}
+ManagementServer::ManagementServer() :
+ m_zmq_context(),
+ m_zmq_sock(m_zmq_context, ZMQ_REP),
+ m_running(false),
+ m_fault(false)
+{ }
+
+ManagementServer::~ManagementServer()
+{
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ m_fault = false;
+ }
+}
+
+void ManagementServer::open(int listenport)
+{
+ m_listenport = listenport;
+ if (m_listenport > 0) {
+ m_thread = std::thread(&ManagementServer::serverThread, this);
+ }
+}
+
void ManagementServer::restart()
{
m_restarter_thread = thread(&ManagementServer::restart_thread, this, 0);
@@ -229,8 +253,9 @@ void ManagementServer::restart_thread(long)
{
m_running = false;
- if (m_listenport) {
+ if (m_thread.joinable()) {
m_thread.join();
+ m_fault = false;
}
m_thread = thread(&ManagementServer::serverThread, this);
@@ -241,20 +266,26 @@ void ManagementServer::serverThread()
m_running = true;
m_fault = false;
- std::stringstream bind_addr;
- bind_addr << "tcp://127.0.0.1:" << m_listenport;
- m_zmq_sock.bind(bind_addr.str().c_str());
+ try {
+ std::string bind_addr = "tcp://127.0.0.1:" + to_string(m_listenport);
+ m_zmq_sock.bind(bind_addr.c_str());
- while (m_running) {
- zmq::message_t zmq_message;
- if (m_zmq_sock.recv(&zmq_message, ZMQ_DONTWAIT)) {
- handle_message(zmq_message);
- }
- else {
- usleep(10000);
+ zmq::pollitem_t pollItems[] = { {m_zmq_sock, 0, ZMQ_POLLIN, 0} };
+
+ while (m_running) {
+ zmq::poll(pollItems, 1, 1000);
+
+ if (pollItems[0].revents & ZMQ_POLLIN) {
+ zmq::message_t zmq_message;
+ m_zmq_sock.recv(&zmq_message);
+ handle_message(zmq_message);
+ }
}
}
-
+ catch (const exception &e) {
+ etiLog.level(error) << "Exception in ManagementServer: " <<
+ e.what();
+ }
m_fault = true;
}
@@ -294,7 +325,7 @@ void ManagementServer::handle_message(zmq::message_t& zmq_message)
std::string answerstr(answer.str());
m_zmq_sock.send(answerstr.c_str(), answerstr.size());
}
- catch (std::exception& e) {
+ catch (const std::exception& e) {
etiLog.level(error) <<
"MGMT server caught exception: " <<
e.what();
diff --git a/src/ManagementServer.h b/src/ManagementServer.h
index f8d722e..c0b50e7 100644
--- a/src/ManagementServer.h
+++ b/src/ManagementServer.h
@@ -135,29 +135,12 @@ class InputStat
class ManagementServer
{
public:
- ManagementServer() :
- m_zmq_context(),
- m_zmq_sock(m_zmq_context, ZMQ_REP),
- m_running(false),
- m_fault(false) { }
-
- ~ManagementServer()
- {
- m_running = false;
- m_fault = false;
- m_thread.join();
- }
-
+ ManagementServer();
ManagementServer(const ManagementServer& other) = delete;
ManagementServer& operator=(const ManagementServer& other) = delete;
+ ~ManagementServer();
- void open(int listenport)
- {
- m_listenport = listenport;
- if (m_listenport > 0) {
- m_thread = std::thread(&ManagementServer::serverThread, this);
- }
- }
+ void open(int listenport);
/* Un-/Register a statistics data source */
void registerInput(InputStat* is);
@@ -174,7 +157,7 @@ class ManagementServer
*/
void update_ptree(const boost::property_tree::ptree& pt);
- bool fault_detected() { return m_fault; }
+ bool fault_detected() const { return m_fault; }
void restart(void);
private: