aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-03-20 08:48:48 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-06-19 10:28:30 +0200
commit0ac177534620caa13864f9bfcd804004e3e538fd (patch)
treed5d9fdc8651f50bd4258b7f2bdae1dc78e4d6982
parent752fa808e3f148f45b689a026f0e703c83b83d92 (diff)
downloaddabmux-0ac177534620caa13864f9bfcd804004e3e538fd.tar.gz
dabmux-0ac177534620caa13864f9bfcd804004e3e538fd.tar.bz2
dabmux-0ac177534620caa13864f9bfcd804004e3e538fd.zip
Add ptree upload to mgmt server (incomplete)
-rw-r--r--src/DabMux.cpp4
-rw-r--r--src/ManagementServer.cpp87
-rw-r--r--src/ManagementServer.h17
3 files changed, 88 insertions, 20 deletions
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 8bd2549..107431b 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -2178,6 +2178,10 @@ int main(int argc, char *argv[])
else if (mgmt_server->request_pending()) {
mgmt_server->update_ptree(pt);
}
+ /*
+ else if (mgmt_server->retrieve_new_ptree(pt)) {
+ }
+ */
}
}
}
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index ae3a0cb..78f1c9d 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -47,7 +47,7 @@ void ManagementServer::registerInput(InputStat* is)
if (m_inputStats.count(id) == 1) {
etiLog.level(error) <<
- "Double registration in Stats Server with id '" <<
+ "Double registration in MGMT Server with id '" <<
id << "'";
return;
}
@@ -196,7 +196,7 @@ void ManagementServer::serverThread()
int welcome_msg_len = snprintf(welcome_msg, 256,
"{ \"service\": \""
- "%s %s Stats Server\" }\n",
+ "%s %s MGMT Server\" }\n",
PACKAGE_NAME,
#if defined(GITVERSION)
GITVERSION
@@ -208,7 +208,8 @@ void ManagementServer::serverThread()
m_sock = socket(AF_INET, SOCK_STREAM, 0);
if (m_sock < 0) {
- etiLog.level(error) << "Error opening Stats Server socket: " <<
+ etiLog.level(error) <<
+ "Error opening MGMT Server socket: " <<
strerror(errno);
m_fault = true;
return;
@@ -220,13 +221,15 @@ void ManagementServer::serverThread()
serv_addr.sin_port = htons(m_listenport);
if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- etiLog.level(error) << "Error binding Stats Server socket: " <<
+ etiLog.level(error) <<
+ "Error binding MGMT Server socket: " <<
strerror(errno);
goto end_serverthread;
}
if (listen(m_sock, 5) < 0) {
- etiLog.level(error) << "Error listening on Stats Server socket: " <<
+ etiLog.level(error) <<
+ "Error listening on MGMT Server socket: " <<
strerror(errno);
goto end_serverthread;
}
@@ -242,14 +245,15 @@ void ManagementServer::serverThread()
&cli_addr_len);
if (accepted_sock < 0) {
- etiLog.level(warn) << "Stats Server cound not accept connection: " <<
+ etiLog.level(warn) << "MGMT Server cound not accept connection: " <<
strerror(errno);
continue;
}
/* Send welcome message with version */
n = write(accepted_sock, welcome_msg, welcome_msg_len);
if (n < 0) {
- etiLog.level(warn) << "Error writing to Stats Server socket " <<
+ etiLog.level(warn) <<
+ "MGMT: Error writing to Stats Server socket " <<
strerror(errno);
close(accepted_sock);
continue;
@@ -259,7 +263,8 @@ void ManagementServer::serverThread()
memset(buffer, 0, 256);
int n = read(accepted_sock, buffer, 255);
if (n < 0) {
- etiLog.level(warn) << "Error reading from Stats Server socket " <<
+ etiLog.level(warn) <<
+ "MGMT: Error reading from Stats Server socket " <<
strerror(errno);
close(accepted_sock);
continue;
@@ -277,11 +282,41 @@ void ManagementServer::serverThread()
std::string json = getStateJSON();
n = write(accepted_sock, json.c_str(), json.size());
}
+ else if (strcmp(buffer, "setptree\n") == 0) {
+ const ssize_t max_json_len = 32768;
+ char json[max_json_len] = {'\0'};
+
+ ssize_t json_len = read(accepted_sock, json, max_json_len);
+
+ if (json_len < max_json_len) {
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+
+ std::stringstream ss;
+ ss << json;
+
+ m_pt.clear();
+ boost::property_tree::json_parser::read_json(ss, m_pt);
+ }
+ else if (json_len == 0) {
+ etiLog.level(warn) <<
+ "MGMT: No JSON data received";
+ }
+ else if (json_len < 0) {
+ etiLog.level(warn) <<
+ "MGMT: JSON data receive error: " <<
+ strerror(errno);
+ }
+ else {
+ etiLog.level(warn) <<
+ "MGMT: Received JSON too large";
+ }
+
+ }
else if (strcmp(buffer, "getptree\n") == 0) {
boost::unique_lock<boost::mutex> lock(m_configmutex);
m_pending = true;
- while (m_pending) {
+ while (m_pending && !m_retrieve_pending) {
m_condition.wait(lock);
}
std::stringstream ss;
@@ -297,7 +332,8 @@ void ManagementServer::serverThread()
}
if (n < 0) {
- etiLog.level(warn) << "Error writing to Stats Server socket " <<
+ etiLog.level(warn) <<
+ "Error writing to MGMT Server socket " <<
strerror(errno);
}
close(accepted_sock);
@@ -309,11 +345,40 @@ end_serverthread:
}
catch (std::exception& e) {
- etiLog.level(error) << "Statistics server caught exception: " << e.what();
+ etiLog.level(error) <<
+ "MGMT server caught exception: " <<
+ e.what();
m_fault = true;
}
}
+bool ManagementServer::retrieve_new_ptree(boost::property_tree::ptree& pt)
+{
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+
+ if (m_retrieve_pending)
+ {
+ pt = m_pt;
+
+ m_retrieve_pending = false;
+ m_condition.notify_one();
+ return true;
+ }
+
+ return false;
+}
+
+void ManagementServer::update_ptree(const boost::property_tree::ptree& pt)
+{
+ if (m_running) {
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+ m_pt = pt;
+ m_pending = false;
+
+ m_condition.notify_one();
+ }
+}
+
/************************************************/
void InputStat::registerAtServer()
diff --git a/src/ManagementServer.h b/src/ManagementServer.h
index fa3f170..273f576 100644
--- a/src/ManagementServer.h
+++ b/src/ManagementServer.h
@@ -351,18 +351,16 @@ class ManagementServer
/* Ask if there is a configuration request pending */
bool request_pending() { return m_pending; }
+ /* Load a ptree given by the management server.
+ *
+ * Returns true if the ptree was updated
+ */
+ bool retrieve_new_ptree(boost::property_tree::ptree& pt);
+
/* Update the copy of the configuration property tree and notify the
* update to the internal server thread.
*/
- void update_ptree(const boost::property_tree::ptree& pt) {
- if (m_running) {
- boost::unique_lock<boost::mutex> lock(m_configmutex);
- m_pt = pt;
- m_pending = false;
-
- m_condition.notify_one();
- }
- }
+ void update_ptree(const boost::property_tree::ptree& pt);
bool fault_detected() { return m_fault; }
void restart(void);
@@ -415,6 +413,7 @@ class ManagementServer
/******** Configuration Data *******/
bool m_pending;
+ bool m_retrieve_pending;
boost::condition_variable m_condition;
mutable boost::mutex m_configmutex;