diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-10-10 11:29:04 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-10-10 11:29:04 +0200 |
commit | f91273f00205b46e86514bed19fba7c43491af44 (patch) | |
tree | c877d5d0081d6d3a49a654ddb8a99cf9bfd0b366 /src | |
parent | b455a74818b35566f4bf524e4824c000bcf3194f (diff) | |
download | dabmod-f91273f00205b46e86514bed19fba7c43491af44.tar.gz dabmod-f91273f00205b46e86514bed19fba7c43491af44.tar.bz2 dabmod-f91273f00205b46e86514bed19fba7c43491af44.zip |
Port some RC changes from ODR-DabMux
Diffstat (limited to 'src')
-rw-r--r-- | src/RemoteControl.cpp | 169 | ||||
-rw-r--r-- | src/RemoteControl.h | 55 |
2 files changed, 142 insertions, 82 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index a053431..ceae942 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -38,6 +38,13 @@ using namespace std; RemoteControllers rcs; +RemoteControllerTelnet::~RemoteControllerTelnet() +{ + m_active = false; + m_io_service.stop(); + m_child_thread.join(); +} + void RemoteControllerTelnet::restart() { m_restarter_thread = boost::thread( @@ -57,7 +64,9 @@ std::list<std::string> RemoteControllable::get_supported_parameters() const { return parameterlist; } -RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) { +RemoteControllable* RemoteControllers::get_controllable_( + const std::string& name) +{ auto rc = std::find_if(controllables.begin(), controllables.end(), [&](RemoteControllable* r) { return r->get_rc_name() == name; }); @@ -69,89 +78,131 @@ RemoteControllable* RemoteControllers::get_controllable_(const std::string& name } } +void RemoteControllers::set_param( + const std::string& name, + const std::string& param, + const std::string& value) +{ + etiLog.level(info) << "RC: Setting " << name << " " << param + << " to " << value; + RemoteControllable* controllable = get_controllable_(name); + return controllable->set_parameter(param, value); +} + // This runs in a separate thread, because // it would take too long to be done in the main loop // thread. void RemoteControllerTelnet::restart_thread(long) { - m_running = false; + m_active = false; + m_io_service.stop(); - if (m_port) { - m_child_thread.interrupt(); - m_child_thread.join(); - } + m_child_thread.join(); m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0); } -void RemoteControllerTelnet::process(long) +void RemoteControllerTelnet::handle_accept( + const boost::system::error_code& boost_error, + boost::shared_ptr< boost::asio::ip::tcp::socket > socket, + boost::asio::ip::tcp::acceptor& acceptor) { - std::string m_welcome = "ODR-DabMod Remote Control CLI\n" - "Write 'help' for help.\n" - "**********\n"; - std::string m_prompt = "> "; + + const std::string welcome = "ODR-DabMux Remote Control CLI\n" + "Write 'help' for help.\n" + "**********\n"; + const std::string prompt = "> "; std::string in_message; size_t length; - try { - boost::asio::io_service io_service; - tcp::acceptor acceptor(io_service, tcp::endpoint( - boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); - - while (m_running) { - in_message = ""; + if (boost_error) + { + etiLog.level(error) << "RC: Error accepting connection"; + return; + } - tcp::socket socket(io_service); + try { + etiLog.level(info) << "RC: Accepted"; - acceptor.accept(socket); + boost::system::error_code ignored_error; - boost::system::error_code ignored_error; + boost::asio::write(*socket, boost::asio::buffer(welcome), + boost::asio::transfer_all(), + ignored_error); - boost::asio::write(socket, boost::asio::buffer(m_welcome), + while (m_active && in_message != "quit") { + boost::asio::write(*socket, boost::asio::buffer(prompt), boost::asio::transfer_all(), ignored_error); - while (m_running && in_message != "quit") { - boost::asio::write(socket, boost::asio::buffer(m_prompt), - boost::asio::transfer_all(), - ignored_error); - - in_message = ""; + in_message = ""; - boost::asio::streambuf buffer; - length = boost::asio::read_until( socket, buffer, "\n", ignored_error); + boost::asio::streambuf buffer; + length = boost::asio::read_until(*socket, buffer, "\n", ignored_error); - std::istream str(&buffer); - std::getline(str, in_message); + std::istream str(&buffer); + std::getline(str, in_message); - if (length == 0) { - std::cerr << "RC: Connection terminated" << std::endl; - break; - } + if (length == 0) { + etiLog.level(info) << "RC: Connection terminated"; + break; + } - while (in_message.length() > 0 && - (in_message[in_message.length()-1] == '\r' || - in_message[in_message.length()-1] == '\n')) { - in_message.erase(in_message.length()-1, 1); - } + while (in_message.length() > 0 && + (in_message[in_message.length()-1] == '\r' || + in_message[in_message.length()-1] == '\n')) { + in_message.erase(in_message.length()-1, 1); + } - if (in_message.length() == 0) { - continue; - } + if (in_message.length() == 0) { + continue; + } - std::cerr << "RC: Got message '" << in_message << "'" << std::endl; + etiLog.level(info) << "RC: Got message '" << in_message << "'"; - dispatch_command(socket, in_message); - } - std::cerr << "RC: Closing socket" << std::endl; - socket.close(); + dispatch_command(*socket, in_message); } + etiLog.level(info) << "RC: Closing socket"; + socket->close(); } - catch (std::exception& e) { - std::cerr << "Remote control caught exception: " << e.what() << std::endl; - m_fault = true; + catch (std::exception& e) + { + etiLog.level(error) << "Remote control caught exception: " << e.what(); + } +} + +void RemoteControllerTelnet::process(long) +{ + m_active = true; + + while (m_active) { + m_io_service.reset(); + + tcp::acceptor acceptor(m_io_service, tcp::endpoint( + boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); + + + // Add a job to start accepting connections. + boost::shared_ptr<tcp::socket> socket( + new tcp::socket(acceptor.get_io_service())); + + // Add an accept call to the service. This will prevent io_service::run() + // from returning. + etiLog.level(info) << "RC: Waiting for connection on port " << m_port; + acceptor.async_accept(*socket, + boost::bind(&RemoteControllerTelnet::handle_accept, + this, + boost::asio::placeholders::error, + socket, + boost::ref(acceptor))); + + // Process event loop. + m_io_service.run(); } + + etiLog.level(info) << "RC: Leaving"; + m_fault = true; } void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command) @@ -272,6 +323,16 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message) #if defined(HAVE_ZEROMQ) +RemoteControllerZmq::~RemoteControllerZmq() { + m_active = false; + m_fault = false; + + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } +} + void RemoteControllerZmq::restart() { m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); @@ -282,7 +343,7 @@ void RemoteControllerZmq::restart() // thread. void RemoteControllerZmq::restart_thread() { - m_running = false; + m_active = false; if (!m_endpoint.empty()) { m_child_thread.interrupt(); @@ -341,7 +402,7 @@ void RemoteControllerZmq::process() // create pollitem that polls the ZMQ sockets zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; - for (;;) { + while (m_active) { zmq::poll(pollItems, 1, 100); std::vector<std::string> msg; diff --git a/src/RemoteControl.h b/src/RemoteControl.h index e4345de..013738b 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -177,10 +177,10 @@ class RemoteControllers { return controllable->get_parameter(param); } - void set_param(const std::string& name, const std::string& param, const std::string& value) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->set_parameter(param, value); - } + void set_param( + const std::string& name, + const std::string& param, + const std::string& value); std::list<RemoteControllable*> controllables; @@ -199,25 +199,25 @@ extern RemoteControllers rcs; class RemoteControllerTelnet : public BaseRemoteController { public: RemoteControllerTelnet() - : m_running(false), m_fault(false), + : m_active(false), + m_io_service(), + m_fault(false), m_port(0) { } RemoteControllerTelnet(int port) - : m_running(true), m_fault(false), - m_child_thread(&RemoteControllerTelnet::process, this, 0), - m_port(port) { } + : m_active(port > 0), + m_io_service(), + m_fault(false), + m_port(port) + { + restart(); + } + RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; - ~RemoteControllerTelnet() { - m_running = false; - m_fault = false; - if (m_port) { - m_child_thread.interrupt(); - m_child_thread.join(); - } - } + ~RemoteControllerTelnet(); virtual bool fault_detected() { return m_fault; } @@ -233,6 +233,10 @@ class RemoteControllerTelnet : public BaseRemoteController { void reply(boost::asio::ip::tcp::socket& socket, std::string message); + void handle_accept( + const boost::system::error_code& boost_error, + boost::shared_ptr< boost::asio::ip::tcp::socket > socket, + boost::asio::ip::tcp::acceptor& acceptor); std::vector<std::string> tokenise_(std::string message) { std::vector<std::string> all_tokens; @@ -244,7 +248,9 @@ class RemoteControllerTelnet : public BaseRemoteController { return all_tokens; } - std::atomic<bool> m_running; + std::atomic<bool> m_active; + + boost::asio::io_service m_io_service; /* This is set to true if a fault occurred */ std::atomic<bool> m_fault; @@ -262,12 +268,12 @@ class RemoteControllerTelnet : public BaseRemoteController { class RemoteControllerZmq : public BaseRemoteController { public: RemoteControllerZmq() - : m_running(false), m_fault(false), + : m_active(false), m_fault(false), m_zmqContext(1), m_endpoint("") { } RemoteControllerZmq(const std::string& endpoint) - : m_running(true), m_fault(false), + : m_active(not endpoint.empty()), m_fault(false), m_zmqContext(1), m_endpoint(endpoint), m_child_thread(&RemoteControllerZmq::process, this) { } @@ -275,14 +281,7 @@ class RemoteControllerZmq : public BaseRemoteController { RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; RemoteControllerZmq(const RemoteControllerZmq& other) = delete; - ~RemoteControllerZmq() { - m_running = false; - m_fault = false; - if (!m_endpoint.empty()) { - m_child_thread.interrupt(); - m_child_thread.join(); - } - } + ~RemoteControllerZmq(); virtual bool fault_detected() { return m_fault; } @@ -296,7 +295,7 @@ class RemoteControllerZmq : public BaseRemoteController { void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); void process(); - std::atomic<bool> m_running; + std::atomic<bool> m_active; /* This is set to true if a fault occurred */ std::atomic<bool> m_fault; |