diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/RemoteControl.cpp | 155 | ||||
-rw-r--r-- | src/RemoteControl.h | 41 |
2 files changed, 121 insertions, 75 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 305334b..fe8b7cd 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -3,8 +3,10 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2016 Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -36,6 +38,13 @@ using namespace std; RemoteControllers rcs; +RemoteControllerTelnet::~RemoteControllerTelnet() +{ + m_active = false; + m_io_service.stop(); + m_child_thread.join(); +} + void RemoteControllerTelnet::restart() { m_restarter_thread = boost::thread( @@ -85,85 +94,115 @@ void RemoteControllers::set_param( // thread. void RemoteControllerTelnet::restart_thread(long) { - if (m_port) { - m_child_thread.interrupt(); - m_child_thread.join(); - } + m_active = false; + m_io_service.stop(); - m_fault = false; + m_child_thread.join(); m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0); } -void RemoteControllerTelnet::process(long) +void RemoteControllerTelnet::handle_accept( + const boost::system::error_code& boost_error, + boost::shared_ptr< boost::asio::ip::tcp::socket > socket, + boost::asio::ip::tcp::acceptor& acceptor) { - std::string m_welcome = "ODR-DabMux Remote Control CLI\n" - "Write 'help' for help.\n" - "**********\n"; - std::string m_prompt = "> "; + + const std::string welcome = "ODR-DabMux Remote Control CLI\n" + "Write 'help' for help.\n" + "**********\n"; + const std::string prompt = "> "; std::string in_message; size_t length; - try { - boost::asio::io_service io_service; - tcp::acceptor acceptor(io_service, tcp::endpoint( - boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); - - while (m_active) { - in_message = ""; + if (boost_error) + { + etiLog.level(error) << "RC: Error accepting connection"; + return; + } - tcp::socket socket(io_service); + try { + etiLog.level(info) << "RC: Accepted"; - acceptor.accept(socket); + boost::system::error_code ignored_error; - boost::system::error_code ignored_error; + boost::asio::write(*socket, boost::asio::buffer(welcome), + boost::asio::transfer_all(), + ignored_error); - boost::asio::write(socket, boost::asio::buffer(m_welcome), + while (m_active && in_message != "quit") { + boost::asio::write(*socket, boost::asio::buffer(prompt), boost::asio::transfer_all(), ignored_error); - while (m_active && in_message != "quit") { - boost::asio::write(socket, boost::asio::buffer(m_prompt), - boost::asio::transfer_all(), - ignored_error); - - in_message = ""; + in_message = ""; - boost::asio::streambuf buffer; - length = boost::asio::read_until( socket, buffer, "\n", ignored_error); + boost::asio::streambuf buffer; + length = boost::asio::read_until(*socket, buffer, "\n", ignored_error); - std::istream str(&buffer); - std::getline(str, in_message); + std::istream str(&buffer); + std::getline(str, in_message); - if (length == 0) { - std::cerr << "RC: Connection terminated" << std::endl; - break; - } + if (length == 0) { + etiLog.level(info) << "RC: Connection terminated"; + break; + } - while (in_message.length() > 0 && - (in_message[in_message.length()-1] == '\r' || - in_message[in_message.length()-1] == '\n')) { - in_message.erase(in_message.length()-1, 1); - } + while (in_message.length() > 0 && + (in_message[in_message.length()-1] == '\r' || + in_message[in_message.length()-1] == '\n')) { + in_message.erase(in_message.length()-1, 1); + } - if (in_message.length() == 0) { - continue; - } + if (in_message.length() == 0) { + continue; + } - std::cerr << "RC: Got message '" << in_message << "'" << std::endl; + etiLog.level(info) << "RC: Got message '" << in_message << "'"; - dispatch_command(socket, in_message); - } - std::cerr << "RC: Closing socket" << std::endl; - socket.close(); + dispatch_command(*socket, in_message); } + etiLog.level(info) << "RC: Closing socket"; + socket->close(); } - catch (std::exception& e) { - etiLog.level(error) << - "Remote control caught exception: " << e.what(); - m_fault = true; + catch (std::exception& e) + { + etiLog.level(error) << "Remote control caught exception: " << e.what(); + } +} + +void RemoteControllerTelnet::process(long) +{ + m_active = true; + + while (m_active) { + m_io_service.reset(); + + tcp::acceptor acceptor(m_io_service, tcp::endpoint( + boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); + + + // Add a job to start accepting connections. + boost::shared_ptr<tcp::socket> socket( + new tcp::socket(acceptor.get_io_service())); + + // Add an accept call to the service. This will prevent io_service::run() + // from returning. + etiLog.level(info) << "RC: Waiting for connection on port " << m_port; + acceptor.async_accept(*socket, + boost::bind(&RemoteControllerTelnet::handle_accept, + this, + boost::asio::placeholders::error, + socket, + boost::ref(acceptor))); + + // Process event loop. + m_io_service.run(); } + + etiLog.level(info) << "RC: Leaving"; + m_fault = true; } void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command) @@ -284,6 +323,16 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message) #if defined(HAVE_RC_ZEROMQ) +RemoteControllerZmq::~RemoteControllerZmq() { + m_active = false; + m_fault = false; + + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } +} + void RemoteControllerZmq::restart() { m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); diff --git a/src/RemoteControl.h b/src/RemoteControl.h index da6f9ea..1c830aa 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -198,25 +198,25 @@ extern RemoteControllers rcs; class RemoteControllerTelnet : public BaseRemoteController { public: RemoteControllerTelnet() - : m_active(false), m_fault(false), + : m_active(false), + m_io_service(), + m_fault(false), m_port(0) { } RemoteControllerTelnet(int port) - : m_active(port > 0), m_fault(false), - m_child_thread(&RemoteControllerTelnet::process, this, 0), - m_port(port) { } + : m_active(port > 0), + m_io_service(), + m_fault(false), + m_port(port) + { + restart(); + } + RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; - ~RemoteControllerTelnet() { - m_active = false; - m_fault = false; - if (m_port) { - m_child_thread.interrupt(); - m_child_thread.join(); - } - } + ~RemoteControllerTelnet(); virtual bool fault_detected() { return m_fault; } @@ -232,6 +232,10 @@ class RemoteControllerTelnet : public BaseRemoteController { void reply(boost::asio::ip::tcp::socket& socket, std::string message); + void handle_accept( + const boost::system::error_code& boost_error, + boost::shared_ptr< boost::asio::ip::tcp::socket > socket, + boost::asio::ip::tcp::acceptor& acceptor); std::vector<std::string> tokenise_(std::string message) { std::vector<std::string> all_tokens; @@ -245,6 +249,8 @@ class RemoteControllerTelnet : public BaseRemoteController { std::atomic<bool> m_active; + boost::asio::io_service m_io_service; + /* This is set to true if a fault occurred */ std::atomic<bool> m_fault; boost::thread m_restarter_thread; @@ -274,16 +280,7 @@ class RemoteControllerZmq : public BaseRemoteController { RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; RemoteControllerZmq(const RemoteControllerZmq& other) = delete; - ~RemoteControllerZmq() { - m_active = false; - m_fault = false; - - m_zmqContext.close(); - if (!m_endpoint.empty()) { - m_child_thread.interrupt(); - m_child_thread.join(); - } - } + ~RemoteControllerZmq(); virtual bool fault_detected() { return m_fault; } |