diff options
| -rw-r--r-- | src/RemoteControl.cpp | 154 | ||||
| -rw-r--r-- | src/RemoteControl.h | 47 | ||||
| -rw-r--r-- | src/TcpSocket.cpp | 23 | ||||
| -rw-r--r-- | src/TcpSocket.h | 3 | ||||
| -rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 9 | ||||
| -rw-r--r-- | src/fig/FIGCarousel.cpp | 28 | ||||
| -rw-r--r-- | src/fig/FIGCarousel.h | 5 | 
7 files changed, 164 insertions, 105 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 31e63f2..fe8b7cd 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -38,6 +38,13 @@ using namespace std;  RemoteControllers rcs; +RemoteControllerTelnet::~RemoteControllerTelnet() +{ +    m_active = false; +    m_io_service.stop(); +    m_child_thread.join(); +} +  void RemoteControllerTelnet::restart()  {      m_restarter_thread = boost::thread( @@ -87,84 +94,115 @@ void RemoteControllers::set_param(  // thread.  void RemoteControllerTelnet::restart_thread(long)  { -    m_running = false; +    m_active = false; +    m_io_service.stop(); -    if (m_port) { -        m_child_thread.interrupt(); -        m_child_thread.join(); -    } +    m_child_thread.join();      m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0);  } -void RemoteControllerTelnet::process(long) +void RemoteControllerTelnet::handle_accept( +        const boost::system::error_code& boost_error, +        boost::shared_ptr< boost::asio::ip::tcp::socket > socket, +        boost::asio::ip::tcp::acceptor& acceptor)  { -    std::string m_welcome = "ODR-DabMux Remote Control CLI\n" -                            "Write 'help' for help.\n" -                            "**********\n"; -    std::string m_prompt = "> "; + +    const std::string welcome = "ODR-DabMux Remote Control CLI\n" +                                "Write 'help' for help.\n" +                                "**********\n"; +    const std::string prompt = "> ";      std::string in_message;      size_t length; -    try { -        boost::asio::io_service io_service; -        tcp::acceptor acceptor(io_service, tcp::endpoint( -                    boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); - -        while (m_running) { -            in_message = ""; +    if (boost_error) +    { +        etiLog.level(error) << "RC: Error accepting connection"; +        return; +    } -            tcp::socket socket(io_service); +    try { +        etiLog.level(info) << "RC: Accepted"; -            acceptor.accept(socket); +        boost::system::error_code ignored_error; -            boost::system::error_code ignored_error; +        boost::asio::write(*socket, boost::asio::buffer(welcome), +                boost::asio::transfer_all(), +                ignored_error); -            boost::asio::write(socket, boost::asio::buffer(m_welcome), +        while (m_active && in_message != "quit") { +            boost::asio::write(*socket, boost::asio::buffer(prompt),                      boost::asio::transfer_all(),                      ignored_error); -            while (m_running && in_message != "quit") { -                boost::asio::write(socket, boost::asio::buffer(m_prompt), -                        boost::asio::transfer_all(), -                        ignored_error); - -                in_message = ""; +            in_message = ""; -                boost::asio::streambuf buffer; -                length = boost::asio::read_until( socket, buffer, "\n", ignored_error); +            boost::asio::streambuf buffer; +            length = boost::asio::read_until(*socket, buffer, "\n", ignored_error); -                std::istream str(&buffer); -                std::getline(str, in_message); +            std::istream str(&buffer); +            std::getline(str, in_message); -                if (length == 0) { -                    std::cerr << "RC: Connection terminated" << std::endl; -                    break; -                } +            if (length == 0) { +                etiLog.level(info) << "RC: Connection terminated"; +                break; +            } -                while (in_message.length() > 0 && -                        (in_message[in_message.length()-1] == '\r' || -                         in_message[in_message.length()-1] == '\n')) { -                    in_message.erase(in_message.length()-1, 1); -                } +            while (in_message.length() > 0 && +                    (in_message[in_message.length()-1] == '\r' || +                     in_message[in_message.length()-1] == '\n')) { +                in_message.erase(in_message.length()-1, 1); +            } -                if (in_message.length() == 0) { -                    continue; -                } +            if (in_message.length() == 0) { +                continue; +            } -                std::cerr << "RC: Got message '" << in_message << "'" << std::endl; +            etiLog.level(info) << "RC: Got message '" << in_message << "'"; -                dispatch_command(socket, in_message); -            } -            std::cerr << "RC: Closing socket" << std::endl; -            socket.close(); +            dispatch_command(*socket, in_message);          } +        etiLog.level(info) << "RC: Closing socket"; +        socket->close();      } -    catch (std::exception& e) { -        std::cerr << "Remote control caught exception: " << e.what() << std::endl; -        m_fault = true; +    catch (std::exception& e) +    { +        etiLog.level(error) << "Remote control caught exception: " << e.what(); +    } +} + +void RemoteControllerTelnet::process(long) +{ +    m_active = true; + +    while (m_active) { +        m_io_service.reset(); + +        tcp::acceptor acceptor(m_io_service, tcp::endpoint( +                    boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); + + +        // Add a job to start accepting connections. +        boost::shared_ptr<tcp::socket> socket( +                new tcp::socket(acceptor.get_io_service())); + +        // Add an accept call to the service.  This will prevent io_service::run() +        // from returning. +        etiLog.level(info) << "RC: Waiting for connection on port " << m_port; +        acceptor.async_accept(*socket, +                boost::bind(&RemoteControllerTelnet::handle_accept, +                    this, +                    boost::asio::placeholders::error, +                    socket, +                    boost::ref(acceptor))); + +        // Process event loop. +        m_io_service.run();      } + +    etiLog.level(info) << "RC: Leaving"; +    m_fault = true;  }  void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command) @@ -285,6 +323,16 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message)  #if defined(HAVE_RC_ZEROMQ) +RemoteControllerZmq::~RemoteControllerZmq() { +    m_active = false; +    m_fault = false; + +    if (!m_endpoint.empty()) { +        m_child_thread.interrupt(); +        m_child_thread.join(); +    } +} +  void RemoteControllerZmq::restart()  {      m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); @@ -295,7 +343,7 @@ void RemoteControllerZmq::restart()  // thread.  void RemoteControllerZmq::restart_thread()  { -    m_running = false; +    m_active = false;      if (!m_endpoint.empty()) {          m_child_thread.interrupt(); @@ -354,7 +402,7 @@ void RemoteControllerZmq::process()          // create pollitem that polls the  ZMQ sockets          zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; -        for (;;) { +        while (m_active) {              zmq::poll(pollItems, 1, 100);              std::vector<std::string> msg; diff --git a/src/RemoteControl.h b/src/RemoteControl.h index fe8ac42..d8b3b6b 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -199,25 +199,25 @@ extern RemoteControllers rcs;  class RemoteControllerTelnet : public BaseRemoteController {      public:          RemoteControllerTelnet() -            : m_running(false), m_fault(false), +            : m_active(false), +            m_io_service(), +            m_fault(false),              m_port(0) { }          RemoteControllerTelnet(int port) -            : m_running(true), m_fault(false), -            m_child_thread(&RemoteControllerTelnet::process, this, 0), -            m_port(port) { } +            : m_active(port > 0), +            m_io_service(), +            m_fault(false), +            m_port(port) +        { +            restart(); +        } +          RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete;          RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; -        ~RemoteControllerTelnet() { -            m_running = false; -            m_fault = false; -            if (m_port) { -                m_child_thread.interrupt(); -                m_child_thread.join(); -            } -        } +        ~RemoteControllerTelnet();          virtual bool fault_detected() { return m_fault; } @@ -233,6 +233,10 @@ class RemoteControllerTelnet : public BaseRemoteController {          void reply(boost::asio::ip::tcp::socket& socket, std::string message); +        void handle_accept( +                const boost::system::error_code& boost_error, +                boost::shared_ptr< boost::asio::ip::tcp::socket > socket, +                boost::asio::ip::tcp::acceptor& acceptor);          std::vector<std::string> tokenise_(std::string message) {              std::vector<std::string> all_tokens; @@ -244,7 +248,9 @@ class RemoteControllerTelnet : public BaseRemoteController {              return all_tokens;          } -        std::atomic<bool> m_running; +        std::atomic<bool> m_active; + +        boost::asio::io_service m_io_service;          /* This is set to true if a fault occurred */          std::atomic<bool> m_fault; @@ -262,12 +268,12 @@ class RemoteControllerTelnet : public BaseRemoteController {  class RemoteControllerZmq : public BaseRemoteController {      public:          RemoteControllerZmq() -            : m_running(false), m_fault(false), +            : m_active(false), m_fault(false),              m_zmqContext(1),              m_endpoint("") { }          RemoteControllerZmq(const std::string& endpoint) -            : m_running(true), m_fault(false), +            : m_active(not endpoint.empty()), m_fault(false),              m_zmqContext(1),              m_endpoint(endpoint),              m_child_thread(&RemoteControllerZmq::process, this) { } @@ -275,14 +281,7 @@ class RemoteControllerZmq : public BaseRemoteController {          RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete;          RemoteControllerZmq(const RemoteControllerZmq& other) = delete; -        ~RemoteControllerZmq() { -            m_running = false; -            m_fault = false; -            if (!m_endpoint.empty()) { -                m_child_thread.interrupt(); -                m_child_thread.join(); -            } -        } +        ~RemoteControllerZmq();          virtual bool fault_detected() { return m_fault; } @@ -296,7 +295,7 @@ class RemoteControllerZmq : public BaseRemoteController {          void send_fail_reply(zmq::socket_t &pSocket, const std::string &error);          void process(); -        std::atomic<bool> m_running; +        std::atomic<bool> m_active;          /* This is set to true if a fault occurred */          std::atomic<bool> m_fault; diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 13efece..433e5c1 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -32,6 +32,7 @@  #include <string.h>  #include <signal.h>  #include <stdint.h> +#include <poll.h>  using namespace std; @@ -166,6 +167,28 @@ TcpSocket TcpSocket::accept()      }  } +boost::optional<TcpSocket> TcpSocket::accept(int timeout_ms) +{ +    struct pollfd fds[1]; +    fds[0].fd = m_sock; +    fds[0].events = POLLIN | POLLOUT; + +    int retval = poll(fds, 1, timeout_ms); + +    if (retval == -1) { +        stringstream ss; +        ss << "TCP Socket accept error: " << strerror(errno); +        throw std::runtime_error(ss.str()); +    } +    else if (retval) { +        return accept(); +    } +    else { +        return boost::none; +    } +} + +  InetAddress TcpSocket::getOwnAddress() const  {      return m_own_address; diff --git a/src/TcpSocket.h b/src/TcpSocket.h index f1354a7..5a4a808 100644 --- a/src/TcpSocket.h +++ b/src/TcpSocket.h @@ -46,6 +46,8 @@  #include <iostream>  #include <string> +#include <boost/optional.hpp> +  /**   *  This class represents a TCP socket.   */ @@ -84,6 +86,7 @@ class TcpSocket          void listen(void);          TcpSocket accept(void); +        boost::optional<TcpSocket> accept(int timeout_ms);          /** Retrieve address this socket is bound to */          InetAddress getOwnAddress() const; diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 2c5a067..8696bec 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -128,9 +128,14 @@ class TCPDataDispatcher          void process(long) {              m_listener_socket.listen(); +            const int timeout_ms = 1000; +              while (m_running) {                  // Add a new TCPConnection to the list, constructing it from the client socket -                m_connections.emplace(m_connections.begin(), m_listener_socket.accept()); +                auto optional_sock = m_listener_socket.accept(timeout_ms); +                if (optional_sock) { +                    m_connections.emplace(m_connections.begin(), std::move(*optional_sock)); +                }              }          } @@ -191,6 +196,8 @@ int DabOutputTcp::Open(const char* name)      string address;      bool success = parse_uri(name, &port, address); +    uri_ = name; +      if (success) {          dispatcher_ = new TCPDataDispatcher();          try { diff --git a/src/fig/FIGCarousel.cpp b/src/fig/FIGCarousel.cpp index bd2bf51..ac2a80b 100644 --- a/src/fig/FIGCarousel.cpp +++ b/src/fig/FIGCarousel.cpp @@ -113,11 +113,11 @@ FIGCarousel::FIGCarousel(std::shared_ptr<dabEnsemble> ensemble) :  void FIGCarousel::load_and_allocate(IFIG& fig, FIBAllocation fib)  { -    int type = fig.figtype(); -    int extension = fig.figextension(); - -    m_figs_available[std::make_pair(type, extension)] = &fig; -    allocate_fig_to_fib(type, extension, fib); +    FIGCarouselElement el; +    el.fig = &fig; +    el.deadline = 0; +    el.increase_deadline(); +    m_fibs[fib].push_back(el);  }  void FIGCarousel::update(unsigned long currentFrame) @@ -125,24 +125,6 @@ void FIGCarousel::update(unsigned long currentFrame)      m_rti.currentFrame = currentFrame;  } -void FIGCarousel::allocate_fig_to_fib(int figtype, int extension, FIBAllocation fib) -{ -    auto fig = m_figs_available.find(std::make_pair(figtype, extension)); - -    if (fig != m_figs_available.end()) { -        FIGCarouselElement el; -        el.fig = fig->second; -        el.deadline = 0; -        el.increase_deadline(); -        m_fibs[fib].push_back(el); -    } -    else { -        std::stringstream ss; -        ss << "No FIG " << figtype << "/" << extension << " available"; -        throw std::runtime_error(ss.str()); -    } -} -  void dumpfib(const uint8_t *buf, size_t bufsize) {      std::cerr << "FIB ";      for (size_t i = 0; i < bufsize; i++) { diff --git a/src/fig/FIGCarousel.h b/src/fig/FIGCarousel.h index be0b23f..f52f266 100644 --- a/src/fig/FIGCarousel.h +++ b/src/fig/FIGCarousel.h @@ -3,7 +3,7 @@     2011, 2012 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2015 +   Copyright (C) 2016     Matthias P. Braendli, matthias.braendli@mpb.li     Implementation of the FIG carousel to schedule the FIGs into the @@ -62,8 +62,6 @@ class FIGCarousel {          void update(unsigned long currentFrame); -        void allocate_fig_to_fib(int figtype, int extension, FIBAllocation fib); -          /* Write all FIBs to the buffer, including correct padding and crc.           * Returns number of bytes written.           * @@ -81,7 +79,6 @@ class FIGCarousel {          void load_and_allocate(IFIG& fig, FIBAllocation fib);          FIGRuntimeInformation m_rti; -        std::map<std::pair<int, int>, IFIG*> m_figs_available;          // Some FIGs can be mapped to a specific FIB or to FIB_ANY          std::map<FIBAllocation, std::list<FIGCarouselElement> > m_fibs;  | 
