diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/DabMux.cpp | 2 | ||||
| -rw-r--r-- | src/ManagementServer.cpp | 246 | ||||
| -rw-r--r-- | src/ManagementServer.h | 30 | 
3 files changed, 126 insertions, 152 deletions
| diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 9fdc560..972a4a2 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -451,7 +451,7 @@ int main(int argc, char *argv[])          PFT edi_pft(207, 3, edi_conf);  #endif -        ssize_t limit = pt.get("general.nbframes", 0); +        size_t limit = pt.get("general.nbframes", 0);          etiLog.level(info) << "Start loop";          /*   Each iteration of the main loop creates one ETI frame */ diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 0aa9ed8..fe1a0be 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -197,170 +197,144 @@ void ManagementServer::restart_thread(long)  void ManagementServer::serverThread()  { +    using boost::asio::ip::tcp; + +    m_running = true;      m_fault = false; -    try { -        int accepted_sock; -        char buffer[256]; -        char welcome_msg[256]; -        struct sockaddr_in serv_addr, cli_addr; -        int n; - -        int welcome_msg_len = snprintf(welcome_msg, 256, -                "{ \"service\": \"" -                "%s %s MGMT Server\" }\n", -                PACKAGE_NAME, -#if defined(GITVERSION) -                GITVERSION -#else -                PACKAGE_VERSION -#endif -                ); +    while (m_running) { +        m_io_service.reset(); +        tcp::acceptor acceptor(m_io_service, tcp::endpoint( +                    boost::asio::ip::address::from_string("127.0.0.1"), +                    m_listenport) ); -        m_sock = socket(AF_INET, SOCK_STREAM, 0); -        if (m_sock < 0) { -            etiLog.level(error) << -                "Error opening MGMT Server socket: " << -                strerror(errno); -            m_fault = true; -            return; -        } -        memset(&serv_addr, 0, sizeof(serv_addr)); -        serv_addr.sin_family = AF_INET; -        serv_addr.sin_addr.s_addr = INADDR_ANY; // TODO listen only on 127.0.0.1 -        serv_addr.sin_port = htons(m_listenport); +        // Add a job to start accepting connections. +        boost::shared_ptr<tcp::socket> socket( +                new tcp::socket(acceptor.get_io_service())); -        if (bind(m_sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { -            etiLog.level(error) << -                "Error binding MGMT Server socket: " << -                strerror(errno); -            goto end_serverthread; -        } +        // Add an accept call to the service.  This will prevent io_service::run() +        // from returning. +        etiLog.level(warn) << "MGMT: Waiting on connection"; +        acceptor.async_accept(*socket, +                boost::bind(&ManagementServer::handle_accept, +                    this, +                    boost::asio::placeholders::error, +                    socket, +                    boost::ref(acceptor))); -        if (listen(m_sock, 5) < 0) { -            etiLog.level(error) << -                "Error listening on MGMT Server socket: " << -                strerror(errno); -            goto end_serverthread; -        } +        // Process event loop. +        m_io_service.run(); +    } -        m_running = true; +    m_fault = true; +} -        while (m_running) { -            socklen_t cli_addr_len = sizeof(cli_addr); +void ManagementServer::handle_accept( +                const boost::system::error_code& boost_error, +                boost::shared_ptr< boost::asio::ip::tcp::socket > socket, +                boost::asio::ip::tcp::acceptor& acceptor) +{ +    if (boost_error) +    { +        etiLog.level(error) << "MGMT: Error accepting connection"; +        return; +    } -            /* Accept actual connection from the client */ -            accepted_sock = accept(m_sock, -                    (struct sockaddr *)&cli_addr, -                    &cli_addr_len); +    std::stringstream welcome_msg; -            if (accepted_sock < 0) { -                etiLog.level(warn) << "MGMT Server cound not accept connection: " << -                    strerror(errno); -                continue; -            } -            /* Send welcome message with version */ -            n = write(accepted_sock, welcome_msg, welcome_msg_len); -            if (n < 0) { -                etiLog.level(warn) << -                    "MGMT: Error writing to Server socket " << -                    strerror(errno); -                close(accepted_sock); -                continue; -            } +    welcome_msg << +        "{ \"service\": \"" << +        PACKAGE_NAME << " " << +#if defined(GITVERSION) +        GITVERSION << +#else +        PACKAGE_VERSION << +#endif +        " MGMT Server\" }\n"; -            /* receive command */ -            memset(buffer, 0, 256); -            int n = read(accepted_sock, buffer, 255); -            if (n < 0) { -                etiLog.level(warn) << -                    "MGMT: Error reading from Server socket " << -                    strerror(errno); -                close(accepted_sock); -                continue; -            } +    try { +        etiLog.level(info) << "RC: Accepted"; -            if (strcmp(buffer, "config\n") == 0) { -                std::string json = getStatConfigJSON(); -                n = write(accepted_sock, json.c_str(), json.size()); -            } -            else if (strcmp(buffer, "values\n") == 0) { -                std::string json = getValuesJSON(); -                n = write(accepted_sock, json.c_str(), json.size()); -            } -            else if (strcmp(buffer, "state\n") == 0) { -                std::string json = getStateJSON(); -                n = write(accepted_sock, json.c_str(), json.size()); -            } -            else if (strcmp(buffer, "setptree\n") == 0) { -                const ssize_t max_json_len = 32768; -                char json[max_json_len] = {'\0'}; - -                ssize_t json_len = read(accepted_sock, json, max_json_len); - -                if (json_len < max_json_len) { -                    boost::unique_lock<boost::mutex> lock(m_configmutex); - -                    std::stringstream ss; -                    ss << json; - -                    m_pt.clear(); -                    boost::property_tree::json_parser::read_json(ss, m_pt); -                } -                else if (json_len == 0) { -                    etiLog.level(warn) << -                        "MGMT: No JSON data received"; -                } -                else if (json_len < 0) { -                    etiLog.level(warn) << -                        "MGMT: JSON data receive error: " << -                        strerror(errno); -                } -                else { -                    etiLog.level(warn) << -                        "MGMT: Received JSON too large"; -                } +        boost::system::error_code ignored_error; -            } -            else if (strcmp(buffer, "getptree\n") == 0) { -                boost::unique_lock<boost::mutex> lock(m_configmutex); -                m_pending = true; +        boost::asio::write(*socket, boost::asio::buffer(welcome_msg.str()), +                boost::asio::transfer_all(), +                ignored_error); -                while (m_pending && !m_retrieve_pending) { -                    m_condition.wait(lock); -                } -                std::stringstream ss; -                boost::property_tree::json_parser::write_json(ss, m_pt); +        boost::asio::streambuf buffer; +        size_t length = +            boost::asio::read_until(*socket, buffer, "\n", ignored_error); -                std::string response = ss.str(); +        std::string in_message; +        std::istream str(&buffer); +        std::getline(str, in_message); -                n = write(accepted_sock, response.c_str(), response.size()); -            } -            else { -                int len = snprintf(buffer, 256, "Invalid command\n"); -                n = write(accepted_sock, buffer, len); -            } +        if (in_message == "config") { +            std::string json = getStatConfigJSON(); +            boost::asio::write(*socket, boost::asio::buffer(json), +                    boost::asio::transfer_all(), +                    ignored_error); +        } +        else if (in_message == "values") { +            std::string json = getValuesJSON(); +            boost::asio::write(*socket, boost::asio::buffer(json), +                    boost::asio::transfer_all(), +                    ignored_error); +        } +        else if (in_message == "state") { +            std::string json = getStateJSON(); -            if (n < 0) { +            boost::asio::write(*socket, boost::asio::buffer(json), +                    boost::asio::transfer_all(), +                    ignored_error); +        } +        else if (in_message == "setptree") { +            boost::asio::streambuf jsonbuffer; +            length = boost::asio::read_until( +                    *socket, jsonbuffer, "\n", ignored_error); + +            if (length > 0) { +                boost::unique_lock<boost::mutex> lock(m_configmutex); +                m_pt.clear(); +                boost::property_tree::json_parser::read_json(jsonbuffer, m_pt); +            } +            else if (length == 0) {                  etiLog.level(warn) << -                    "Error writing to MGMT Server socket " << -                    strerror(errno); +                    "MGMT: No JSON data received"; +            } +            else { +                etiLog.level(error) << +                    "MGMT: Error JSON reception";              } -            close(accepted_sock);          } +        else if (in_message == "getptree") { +            boost::unique_lock<boost::mutex> lock(m_configmutex); +            m_pending = true; -end_serverthread: -        m_fault = true; -        close(m_sock); +            while (m_pending && !m_retrieve_pending) { +                m_condition.wait(lock); +            } +            std::stringstream ss; +            boost::property_tree::json_parser::write_json(ss, m_pt); +            boost::asio::write(*socket, ss, +                    boost::asio::transfer_all(), +                    ignored_error); +        } +        else { +            std::stringstream ss; +            ss << "Invalid command\n"; +            boost::asio::write(*socket, ss, +                    boost::asio::transfer_all(), +                    ignored_error); +        }      }      catch (std::exception& e) {          etiLog.level(error) <<              "MGMT server caught exception: " <<              e.what(); -        m_fault = true;      }  } diff --git a/src/ManagementServer.h b/src/ManagementServer.h index c71f6d2..cea692f 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -52,15 +52,12 @@  #   include "config.h"  #endif -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h>  #include <string>  #include <map> +#include <atomic>  #include <boost/thread.hpp> +#include <boost/asio.hpp> +#include <boost/bind.hpp>  #include <boost/property_tree/ptree.hpp>  #include <boost/property_tree/json_parser.hpp>  #include <ctime> @@ -317,6 +314,7 @@ class ManagementServer  {      public:          ManagementServer() : +            m_io_service(),              m_running(false),              m_fault(false),              m_pending(false) { } @@ -326,10 +324,8 @@ class ManagementServer              m_running = false;              m_fault = false;              m_pending = false; -            if (m_sock) { -                close(m_sock); -                m_thread.interrupt(); -            } + +            m_io_service.stop();              m_thread.join();          } @@ -337,7 +333,6 @@ class ManagementServer          {              m_listenport = listenport;              if (m_listenport > 0) { -                m_sock = 0;                  m_thread = boost::thread(&ManagementServer::serverThread, this);              }          } @@ -364,6 +359,8 @@ class ManagementServer          void restart(void);      private: +        boost::asio::io_service m_io_service; +          void restart_thread(long);          /******* TCP Socket Server ******/ @@ -374,16 +371,19 @@ class ManagementServer          bool isInputRegistered(std::string& id); +        void handle_accept( +                const boost::system::error_code& boost_error, +                boost::shared_ptr< boost::asio::ip::tcp::socket > socket, +                boost::asio::ip::tcp::acceptor& acceptor); +          int m_listenport;          // serverThread runs in a separate thread -        bool m_running; -        bool m_fault; +        std::atomic<bool> m_running; +        std::atomic<bool> m_fault;          boost::thread m_thread;          boost::thread m_restarter_thread; -        int m_sock; -          /******* Statistics Data ********/          std::map<std::string, InputStat*> m_inputStats; | 
