From 93cbefeeffcba044ab95ee46307ab50ec717bf2b Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 22 Mar 2015 21:25:46 +0100 Subject: Use boost::asio in Management Server --- src/DabMux.cpp | 2 +- src/ManagementServer.cpp | 246 +++++++++++++++++++++-------------------------- src/ManagementServer.h | 30 +++--- 3 files changed, 126 insertions(+), 152 deletions(-) diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 9fdc560..972a4a2 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -451,7 +451,7 @@ int main(int argc, char *argv[]) PFT edi_pft(207, 3, edi_conf); #endif - ssize_t limit = pt.get("general.nbframes", 0); + size_t limit = pt.get("general.nbframes", 0); etiLog.level(info) << "Start loop"; /* Each iteration of the main loop creates one ETI frame */ diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 0aa9ed8..fe1a0be 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -197,170 +197,144 @@ void ManagementServer::restart_thread(long) void ManagementServer::serverThread() { + using boost::asio::ip::tcp; + + m_running = true; m_fault = false; - try { - int accepted_sock; - char buffer[256]; - char welcome_msg[256]; - struct sockaddr_in serv_addr, cli_addr; - int n; - - int welcome_msg_len = snprintf(welcome_msg, 256, - "{ \"service\": \"" - "%s %s MGMT Server\" }\n", - PACKAGE_NAME, -#if defined(GITVERSION) - GITVERSION -#else - PACKAGE_VERSION -#endif - ); + while (m_running) { + m_io_service.reset(); + tcp::acceptor acceptor(m_io_service, tcp::endpoint( + boost::asio::ip::address::from_string("127.0.0.1"), + m_listenport) ); - m_sock = socket(AF_INET, SOCK_STREAM, 0); - if (m_sock < 0) { - etiLog.level(error) << - "Error opening MGMT Server socket: " << - strerror(errno); - m_fault = true; - return; - } - memset(&serv_addr, 0, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 - serv_addr.sin_port = htons(m_listenport); + // Add a job to start accepting connections. + boost::shared_ptr socket( + new tcp::socket(acceptor.get_io_service())); - if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - etiLog.level(error) << - "Error binding MGMT Server socket: " << - strerror(errno); - goto end_serverthread; - } + // Add an accept call to the service. This will prevent io_service::run() + // from returning. + etiLog.level(warn) << "MGMT: Waiting on connection"; + acceptor.async_accept(*socket, + boost::bind(&ManagementServer::handle_accept, + this, + boost::asio::placeholders::error, + socket, + boost::ref(acceptor))); - if (listen(m_sock, 5) < 0) { - etiLog.level(error) << - "Error listening on MGMT Server socket: " << - strerror(errno); - goto end_serverthread; - } + // Process event loop. + m_io_service.run(); + } - m_running = true; + m_fault = true; +} - while (m_running) { - socklen_t cli_addr_len = sizeof(cli_addr); +void ManagementServer::handle_accept( + const boost::system::error_code& boost_error, + boost::shared_ptr< boost::asio::ip::tcp::socket > socket, + boost::asio::ip::tcp::acceptor& acceptor) +{ + if (boost_error) + { + etiLog.level(error) << "MGMT: Error accepting connection"; + return; + } - /* Accept actual connection from the client */ - accepted_sock = accept(m_sock, - (struct sockaddr *)&cli_addr, - &cli_addr_len); + std::stringstream welcome_msg; - if (accepted_sock < 0) { - etiLog.level(warn) << "MGMT Server cound not accept connection: " << - strerror(errno); - continue; - } - /* Send welcome message with version */ - n = write(accepted_sock, welcome_msg, welcome_msg_len); - if (n < 0) { - etiLog.level(warn) << - "MGMT: Error writing to Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } + welcome_msg << + "{ \"service\": \"" << + PACKAGE_NAME << " " << +#if defined(GITVERSION) + GITVERSION << +#else + PACKAGE_VERSION << +#endif + " MGMT Server\" }\n"; - /* receive command */ - memset(buffer, 0, 256); - int n = read(accepted_sock, buffer, 255); - if (n < 0) { - etiLog.level(warn) << - "MGMT: Error reading from Server socket " << - strerror(errno); - close(accepted_sock); - continue; - } + try { + etiLog.level(info) << "RC: Accepted"; - if (strcmp(buffer, "config\n") == 0) { - std::string json = getStatConfigJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "values\n") == 0) { - std::string json = getValuesJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "state\n") == 0) { - std::string json = getStateJSON(); - n = write(accepted_sock, json.c_str(), json.size()); - } - else if (strcmp(buffer, "setptree\n") == 0) { - const ssize_t max_json_len = 32768; - char json[max_json_len] = {'\0'}; - - ssize_t json_len = read(accepted_sock, json, max_json_len); - - if (json_len < max_json_len) { - boost::unique_lock lock(m_configmutex); - - std::stringstream ss; - ss << json; - - m_pt.clear(); - boost::property_tree::json_parser::read_json(ss, m_pt); - } - else if (json_len == 0) { - etiLog.level(warn) << - "MGMT: No JSON data received"; - } - else if (json_len < 0) { - etiLog.level(warn) << - "MGMT: JSON data receive error: " << - strerror(errno); - } - else { - etiLog.level(warn) << - "MGMT: Received JSON too large"; - } + boost::system::error_code ignored_error; - } - else if (strcmp(buffer, "getptree\n") == 0) { - boost::unique_lock lock(m_configmutex); - m_pending = true; + boost::asio::write(*socket, boost::asio::buffer(welcome_msg.str()), + boost::asio::transfer_all(), + ignored_error); - while (m_pending && !m_retrieve_pending) { - m_condition.wait(lock); - } - std::stringstream ss; - boost::property_tree::json_parser::write_json(ss, m_pt); + boost::asio::streambuf buffer; + size_t length = + boost::asio::read_until(*socket, buffer, "\n", ignored_error); - std::string response = ss.str(); + std::string in_message; + std::istream str(&buffer); + std::getline(str, in_message); - n = write(accepted_sock, response.c_str(), response.size()); - } - else { - int len = snprintf(buffer, 256, "Invalid command\n"); - n = write(accepted_sock, buffer, len); - } + if (in_message == "config") { + std::string json = getStatConfigJSON(); + boost::asio::write(*socket, boost::asio::buffer(json), + boost::asio::transfer_all(), + ignored_error); + } + else if (in_message == "values") { + std::string json = getValuesJSON(); + boost::asio::write(*socket, boost::asio::buffer(json), + boost::asio::transfer_all(), + ignored_error); + } + else if (in_message == "state") { + std::string json = getStateJSON(); - if (n < 0) { + boost::asio::write(*socket, boost::asio::buffer(json), + boost::asio::transfer_all(), + ignored_error); + } + else if (in_message == "setptree") { + boost::asio::streambuf jsonbuffer; + length = boost::asio::read_until( + *socket, jsonbuffer, "\n", ignored_error); + + if (length > 0) { + boost::unique_lock lock(m_configmutex); + m_pt.clear(); + boost::property_tree::json_parser::read_json(jsonbuffer, m_pt); + } + else if (length == 0) { etiLog.level(warn) << - "Error writing to MGMT Server socket " << - strerror(errno); + "MGMT: No JSON data received"; + } + else { + etiLog.level(error) << + "MGMT: Error JSON reception"; } - close(accepted_sock); } + else if (in_message == "getptree") { + boost::unique_lock lock(m_configmutex); + m_pending = true; -end_serverthread: - m_fault = true; - close(m_sock); + while (m_pending && !m_retrieve_pending) { + m_condition.wait(lock); + } + std::stringstream ss; + boost::property_tree::json_parser::write_json(ss, m_pt); + boost::asio::write(*socket, ss, + boost::asio::transfer_all(), + ignored_error); + } + else { + std::stringstream ss; + ss << "Invalid command\n"; + boost::asio::write(*socket, ss, + boost::asio::transfer_all(), + ignored_error); + } } catch (std::exception& e) { etiLog.level(error) << "MGMT server caught exception: " << e.what(); - m_fault = true; } } diff --git a/src/ManagementServer.h b/src/ManagementServer.h index c71f6d2..cea692f 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -52,15 +52,12 @@ # include "config.h" #endif -#include -#include -#include -#include -#include -#include #include #include +#include #include +#include +#include #include #include #include @@ -317,6 +314,7 @@ class ManagementServer { public: ManagementServer() : + m_io_service(), m_running(false), m_fault(false), m_pending(false) { } @@ -326,10 +324,8 @@ class ManagementServer m_running = false; m_fault = false; m_pending = false; - if (m_sock) { - close(m_sock); - m_thread.interrupt(); - } + + m_io_service.stop(); m_thread.join(); } @@ -337,7 +333,6 @@ class ManagementServer { m_listenport = listenport; if (m_listenport > 0) { - m_sock = 0; m_thread = boost::thread(&ManagementServer::serverThread, this); } } @@ -364,6 +359,8 @@ class ManagementServer void restart(void); private: + boost::asio::io_service m_io_service; + void restart_thread(long); /******* TCP Socket Server ******/ @@ -374,16 +371,19 @@ class ManagementServer bool isInputRegistered(std::string& id); + void handle_accept( + const boost::system::error_code& boost_error, + boost::shared_ptr< boost::asio::ip::tcp::socket > socket, + boost::asio::ip::tcp::acceptor& acceptor); + int m_listenport; // serverThread runs in a separate thread - bool m_running; - bool m_fault; + std::atomic m_running; + std::atomic m_fault; boost::thread m_thread; boost::thread m_restarter_thread; - int m_sock; - /******* Statistics Data ********/ std::map m_inputStats; -- cgit v1.2.3