aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-03-22 21:25:46 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-06-19 10:43:57 +0200
commit93cbefeeffcba044ab95ee46307ab50ec717bf2b (patch)
tree4a92f0bf1715ac8eeef149a6e2d33d318513c2d4
parent76e7f0f79c908bf7d0a447ea643dbcdde8f064d2 (diff)
downloaddabmux-93cbefeeffcba044ab95ee46307ab50ec717bf2b.tar.gz
dabmux-93cbefeeffcba044ab95ee46307ab50ec717bf2b.tar.bz2
dabmux-93cbefeeffcba044ab95ee46307ab50ec717bf2b.zip
Use boost::asio in Management Server
-rw-r--r--src/DabMux.cpp2
-rw-r--r--src/ManagementServer.cpp246
-rw-r--r--src/ManagementServer.h30
3 files changed, 126 insertions, 152 deletions
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 9fdc560..972a4a2 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -451,7 +451,7 @@ int main(int argc, char *argv[])
PFT edi_pft(207, 3, edi_conf);
#endif
- ssize_t limit = pt.get("general.nbframes", 0);
+ size_t limit = pt.get("general.nbframes", 0);
etiLog.level(info) << "Start loop";
/* Each iteration of the main loop creates one ETI frame */
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index 0aa9ed8..fe1a0be 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -197,170 +197,144 @@ void ManagementServer::restart_thread(long)
void ManagementServer::serverThread()
{
+ using boost::asio::ip::tcp;
+
+ m_running = true;
m_fault = false;
- try {
- int accepted_sock;
- char buffer[256];
- char welcome_msg[256];
- struct sockaddr_in serv_addr, cli_addr;
- int n;
-
- int welcome_msg_len = snprintf(welcome_msg, 256,
- "{ \"service\": \""
- "%s %s MGMT Server\" }\n",
- PACKAGE_NAME,
-#if defined(GITVERSION)
- GITVERSION
-#else
- PACKAGE_VERSION
-#endif
- );
+ while (m_running) {
+ m_io_service.reset();
+ tcp::acceptor acceptor(m_io_service, tcp::endpoint(
+ boost::asio::ip::address::from_string("127.0.0.1"),
+ m_listenport) );
- m_sock = socket(AF_INET, SOCK_STREAM, 0);
- if (m_sock < 0) {
- etiLog.level(error) <<
- "Error opening MGMT Server socket: " <<
- strerror(errno);
- m_fault = true;
- return;
- }
- memset(&serv_addr, 0, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1
- serv_addr.sin_port = htons(m_listenport);
+ // Add a job to start accepting connections.
+ boost::shared_ptr<tcp::socket> socket(
+ new tcp::socket(acceptor.get_io_service()));
- if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
- etiLog.level(error) <<
- "Error binding MGMT Server socket: " <<
- strerror(errno);
- goto end_serverthread;
- }
+ // Add an accept call to the service. This will prevent io_service::run()
+ // from returning.
+ etiLog.level(warn) << "MGMT: Waiting on connection";
+ acceptor.async_accept(*socket,
+ boost::bind(&ManagementServer::handle_accept,
+ this,
+ boost::asio::placeholders::error,
+ socket,
+ boost::ref(acceptor)));
- if (listen(m_sock, 5) < 0) {
- etiLog.level(error) <<
- "Error listening on MGMT Server socket: " <<
- strerror(errno);
- goto end_serverthread;
- }
+ // Process event loop.
+ m_io_service.run();
+ }
- m_running = true;
+ m_fault = true;
+}
- while (m_running) {
- socklen_t cli_addr_len = sizeof(cli_addr);
+void ManagementServer::handle_accept(
+ const boost::system::error_code& boost_error,
+ boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
+ boost::asio::ip::tcp::acceptor& acceptor)
+{
+ if (boost_error)
+ {
+ etiLog.level(error) << "MGMT: Error accepting connection";
+ return;
+ }
- /* Accept actual connection from the client */
- accepted_sock = accept(m_sock,
- (struct sockaddr *)&cli_addr,
- &cli_addr_len);
+ std::stringstream welcome_msg;
- if (accepted_sock < 0) {
- etiLog.level(warn) << "MGMT Server cound not accept connection: " <<
- strerror(errno);
- continue;
- }
- /* Send welcome message with version */
- n = write(accepted_sock, welcome_msg, welcome_msg_len);
- if (n < 0) {
- etiLog.level(warn) <<
- "MGMT: Error writing to Server socket " <<
- strerror(errno);
- close(accepted_sock);
- continue;
- }
+ welcome_msg <<
+ "{ \"service\": \"" <<
+ PACKAGE_NAME << " " <<
+#if defined(GITVERSION)
+ GITVERSION <<
+#else
+ PACKAGE_VERSION <<
+#endif
+ " MGMT Server\" }\n";
- /* receive command */
- memset(buffer, 0, 256);
- int n = read(accepted_sock, buffer, 255);
- if (n < 0) {
- etiLog.level(warn) <<
- "MGMT: Error reading from Server socket " <<
- strerror(errno);
- close(accepted_sock);
- continue;
- }
+ try {
+ etiLog.level(info) << "RC: Accepted";
- if (strcmp(buffer, "config\n") == 0) {
- std::string json = getStatConfigJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else if (strcmp(buffer, "values\n") == 0) {
- std::string json = getValuesJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else if (strcmp(buffer, "state\n") == 0) {
- std::string json = getStateJSON();
- n = write(accepted_sock, json.c_str(), json.size());
- }
- else if (strcmp(buffer, "setptree\n") == 0) {
- const ssize_t max_json_len = 32768;
- char json[max_json_len] = {'\0'};
-
- ssize_t json_len = read(accepted_sock, json, max_json_len);
-
- if (json_len < max_json_len) {
- boost::unique_lock<boost::mutex> lock(m_configmutex);
-
- std::stringstream ss;
- ss << json;
-
- m_pt.clear();
- boost::property_tree::json_parser::read_json(ss, m_pt);
- }
- else if (json_len == 0) {
- etiLog.level(warn) <<
- "MGMT: No JSON data received";
- }
- else if (json_len < 0) {
- etiLog.level(warn) <<
- "MGMT: JSON data receive error: " <<
- strerror(errno);
- }
- else {
- etiLog.level(warn) <<
- "MGMT: Received JSON too large";
- }
+ boost::system::error_code ignored_error;
- }
- else if (strcmp(buffer, "getptree\n") == 0) {
- boost::unique_lock<boost::mutex> lock(m_configmutex);
- m_pending = true;
+ boost::asio::write(*socket, boost::asio::buffer(welcome_msg.str()),
+ boost::asio::transfer_all(),
+ ignored_error);
- while (m_pending && !m_retrieve_pending) {
- m_condition.wait(lock);
- }
- std::stringstream ss;
- boost::property_tree::json_parser::write_json(ss, m_pt);
+ boost::asio::streambuf buffer;
+ size_t length =
+ boost::asio::read_until(*socket, buffer, "\n", ignored_error);
- std::string response = ss.str();
+ std::string in_message;
+ std::istream str(&buffer);
+ std::getline(str, in_message);
- n = write(accepted_sock, response.c_str(), response.size());
- }
- else {
- int len = snprintf(buffer, 256, "Invalid command\n");
- n = write(accepted_sock, buffer, len);
- }
+ if (in_message == "config") {
+ std::string json = getStatConfigJSON();
+ boost::asio::write(*socket, boost::asio::buffer(json),
+ boost::asio::transfer_all(),
+ ignored_error);
+ }
+ else if (in_message == "values") {
+ std::string json = getValuesJSON();
+ boost::asio::write(*socket, boost::asio::buffer(json),
+ boost::asio::transfer_all(),
+ ignored_error);
+ }
+ else if (in_message == "state") {
+ std::string json = getStateJSON();
- if (n < 0) {
+ boost::asio::write(*socket, boost::asio::buffer(json),
+ boost::asio::transfer_all(),
+ ignored_error);
+ }
+ else if (in_message == "setptree") {
+ boost::asio::streambuf jsonbuffer;
+ length = boost::asio::read_until(
+ *socket, jsonbuffer, "\n", ignored_error);
+
+ if (length > 0) {
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+ m_pt.clear();
+ boost::property_tree::json_parser::read_json(jsonbuffer, m_pt);
+ }
+ else if (length == 0) {
etiLog.level(warn) <<
- "Error writing to MGMT Server socket " <<
- strerror(errno);
+ "MGMT: No JSON data received";
+ }
+ else {
+ etiLog.level(error) <<
+ "MGMT: Error JSON reception";
}
- close(accepted_sock);
}
+ else if (in_message == "getptree") {
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+ m_pending = true;
-end_serverthread:
- m_fault = true;
- close(m_sock);
+ while (m_pending && !m_retrieve_pending) {
+ m_condition.wait(lock);
+ }
+ std::stringstream ss;
+ boost::property_tree::json_parser::write_json(ss, m_pt);
+ boost::asio::write(*socket, ss,
+ boost::asio::transfer_all(),
+ ignored_error);
+ }
+ else {
+ std::stringstream ss;
+ ss << "Invalid command\n";
+ boost::asio::write(*socket, ss,
+ boost::asio::transfer_all(),
+ ignored_error);
+ }
}
catch (std::exception& e) {
etiLog.level(error) <<
"MGMT server caught exception: " <<
e.what();
- m_fault = true;
}
}
diff --git a/src/ManagementServer.h b/src/ManagementServer.h
index c71f6d2..cea692f 100644
--- a/src/ManagementServer.h
+++ b/src/ManagementServer.h
@@ -52,15 +52,12 @@
# include "config.h"
#endif
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
#include <string>
#include <map>
+#include <atomic>
#include <boost/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/bind.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <ctime>
@@ -317,6 +314,7 @@ class ManagementServer
{
public:
ManagementServer() :
+ m_io_service(),
m_running(false),
m_fault(false),
m_pending(false) { }
@@ -326,10 +324,8 @@ class ManagementServer
m_running = false;
m_fault = false;
m_pending = false;
- if (m_sock) {
- close(m_sock);
- m_thread.interrupt();
- }
+
+ m_io_service.stop();
m_thread.join();
}
@@ -337,7 +333,6 @@ class ManagementServer
{
m_listenport = listenport;
if (m_listenport > 0) {
- m_sock = 0;
m_thread = boost::thread(&ManagementServer::serverThread, this);
}
}
@@ -364,6 +359,8 @@ class ManagementServer
void restart(void);
private:
+ boost::asio::io_service m_io_service;
+
void restart_thread(long);
/******* TCP Socket Server ******/
@@ -374,16 +371,19 @@ class ManagementServer
bool isInputRegistered(std::string& id);
+ void handle_accept(
+ const boost::system::error_code& boost_error,
+ boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
+ boost::asio::ip::tcp::acceptor& acceptor);
+
int m_listenport;
// serverThread runs in a separate thread
- bool m_running;
- bool m_fault;
+ std::atomic<bool> m_running;
+ std::atomic<bool> m_fault;
boost::thread m_thread;
boost::thread m_restarter_thread;
- int m_sock;
-
/******* Statistics Data ********/
std::map<std::string, InputStat*> m_inputStats;