From eccfa8ad3774205a929ff70090540d24674618a1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 10 Oct 2016 08:44:54 +0200 Subject: Simplify FIG allocation code in carousel --- src/fig/FIGCarousel.cpp | 28 +++++----------------------- src/fig/FIGCarousel.h | 5 +---- 2 files changed, 6 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/fig/FIGCarousel.cpp b/src/fig/FIGCarousel.cpp index cbea12b..63226e0 100644 --- a/src/fig/FIGCarousel.cpp +++ b/src/fig/FIGCarousel.cpp @@ -111,11 +111,11 @@ FIGCarousel::FIGCarousel(std::shared_ptr ensemble) : void FIGCarousel::load_and_allocate(IFIG& fig, FIBAllocation fib) { - int type = fig.figtype(); - int extension = fig.figextension(); - - m_figs_available[std::make_pair(type, extension)] = &fig; - allocate_fig_to_fib(type, extension, fib); + FIGCarouselElement el; + el.fig = &fig; + el.deadline = 0; + el.increase_deadline(); + m_fibs[fib].push_back(el); } void FIGCarousel::update(unsigned long currentFrame) @@ -123,24 +123,6 @@ void FIGCarousel::update(unsigned long currentFrame) m_rti.currentFrame = currentFrame; } -void FIGCarousel::allocate_fig_to_fib(int figtype, int extension, FIBAllocation fib) -{ - auto fig = m_figs_available.find(std::make_pair(figtype, extension)); - - if (fig != m_figs_available.end()) { - FIGCarouselElement el; - el.fig = fig->second; - el.deadline = 0; - el.increase_deadline(); - m_fibs[fib].push_back(el); - } - else { - std::stringstream ss; - ss << "No FIG " << figtype << "/" << extension << " available"; - throw std::runtime_error(ss.str()); - } -} - void dumpfib(const uint8_t *buf, size_t bufsize) { std::cerr << "FIB "; for (size_t i = 0; i < bufsize; i++) { diff --git a/src/fig/FIGCarousel.h b/src/fig/FIGCarousel.h index 67772a6..b46f4a6 100644 --- a/src/fig/FIGCarousel.h +++ b/src/fig/FIGCarousel.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2015 + Copyright (C) 2016 Matthias P. Braendli, matthias.braendli@mpb.li Implementation of the FIG carousel to schedule the FIGs into the @@ -62,8 +62,6 @@ class FIGCarousel { void update(unsigned long currentFrame); - void allocate_fig_to_fib(int figtype, int extension, FIBAllocation fib); - /* Write all FIBs to the buffer, including correct padding and crc. * Returns number of bytes written. * @@ -81,7 +79,6 @@ class FIGCarousel { void load_and_allocate(IFIG& fig, FIBAllocation fib); FIGRuntimeInformation m_rti; - std::map, IFIG*> m_figs_available; // Some FIGs can be mapped to a specific FIB or to FIB_ANY std::map > m_fibs; -- cgit v1.2.3 From e2e1a0f374aeffc933d0e5d295607197d6b74a4a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 10 Oct 2016 10:10:24 +0200 Subject: Fix dabOutputTCP shutdown using accept() with timeout --- src/TcpSocket.cpp | 28 ++++++++++++++++++++++++++++ src/TcpSocket.h | 3 +++ src/dabOutput/dabOutputTcp.cpp | 7 ++++++- 3 files changed, 37 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 13efece..6e7c31d 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -166,6 +166,34 @@ TcpSocket TcpSocket::accept() } } +boost::optional TcpSocket::accept(int timeout_ms) +{ + fd_set rfds; + struct timeval tv; + int retval; + + FD_ZERO(&rfds); + FD_SET(m_sock, &rfds); + + tv.tv_sec = 0; + tv.tv_usec = 1000ul * timeout_ms; + + retval = select(1, &rfds, NULL, NULL, &tv); + + if (retval == -1) { + stringstream ss; + ss << "TCP Socket accept error: " << strerror(errno); + throw std::runtime_error(ss.str()); + } + else if (retval) { + return accept(); + } + else { + return boost::none; + } +} + + InetAddress TcpSocket::getOwnAddress() const { return m_own_address; diff --git a/src/TcpSocket.h b/src/TcpSocket.h index f1354a7..5a4a808 100644 --- a/src/TcpSocket.h +++ b/src/TcpSocket.h @@ -46,6 +46,8 @@ #include #include +#include + /** * This class represents a TCP socket. */ @@ -84,6 +86,7 @@ class TcpSocket void listen(void); TcpSocket accept(void); + boost::optional accept(int timeout_ms); /** Retrieve address this socket is bound to */ InetAddress getOwnAddress() const; diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 2c5a067..343ba0f 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -128,9 +128,14 @@ class TCPDataDispatcher void process(long) { m_listener_socket.listen(); + const int timeout_ms = 1000; + while (m_running) { // Add a new TCPConnection to the list, constructing it from the client socket - m_connections.emplace(m_connections.begin(), m_listener_socket.accept()); + auto optional_sock = m_listener_socket.accept(timeout_ms); + if (optional_sock) { + m_connections.emplace(m_connections.begin(), std::move(*optional_sock)); + } } } -- cgit v1.2.3 From 5b410d7d1f398272cef6a031b561377bed3694f7 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 10 Oct 2016 10:30:11 +0200 Subject: Replace m_running by m_active in RC --- src/RemoteControl.cpp | 15 ++++++++------- src/RemoteControl.h | 18 ++++++++++-------- 2 files changed, 18 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 9ee1f24..305334b 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -85,13 +85,13 @@ void RemoteControllers::set_param( // thread. void RemoteControllerTelnet::restart_thread(long) { - m_running = false; - if (m_port) { m_child_thread.interrupt(); m_child_thread.join(); } + m_fault = false; + m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0); } @@ -110,7 +110,7 @@ void RemoteControllerTelnet::process(long) tcp::acceptor acceptor(io_service, tcp::endpoint( boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); - while (m_running) { + while (m_active) { in_message = ""; tcp::socket socket(io_service); @@ -123,7 +123,7 @@ void RemoteControllerTelnet::process(long) boost::asio::transfer_all(), ignored_error); - while (m_running && in_message != "quit") { + while (m_active && in_message != "quit") { boost::asio::write(socket, boost::asio::buffer(m_prompt), boost::asio::transfer_all(), ignored_error); @@ -160,7 +160,8 @@ void RemoteControllerTelnet::process(long) } } catch (std::exception& e) { - std::cerr << "Remote control caught exception: " << e.what() << std::endl; + etiLog.level(error) << + "Remote control caught exception: " << e.what(); m_fault = true; } } @@ -293,7 +294,7 @@ void RemoteControllerZmq::restart() // thread. void RemoteControllerZmq::restart_thread() { - m_running = false; + m_active = false; if (!m_endpoint.empty()) { m_child_thread.interrupt(); @@ -352,7 +353,7 @@ void RemoteControllerZmq::process() // create pollitem that polls the ZMQ sockets zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; - for (;;) { + while (m_active) { zmq::poll(pollItems, 1, 100); std::vector msg; diff --git a/src/RemoteControl.h b/src/RemoteControl.h index 1a81b42..da6f9ea 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -198,11 +198,11 @@ extern RemoteControllers rcs; class RemoteControllerTelnet : public BaseRemoteController { public: RemoteControllerTelnet() - : m_running(false), m_fault(false), + : m_active(false), m_fault(false), m_port(0) { } RemoteControllerTelnet(int port) - : m_running(true), m_fault(false), + : m_active(port > 0), m_fault(false), m_child_thread(&RemoteControllerTelnet::process, this, 0), m_port(port) { } @@ -210,7 +210,7 @@ class RemoteControllerTelnet : public BaseRemoteController { RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; ~RemoteControllerTelnet() { - m_running = false; + m_active = false; m_fault = false; if (m_port) { m_child_thread.interrupt(); @@ -243,7 +243,7 @@ class RemoteControllerTelnet : public BaseRemoteController { return all_tokens; } - std::atomic m_running; + std::atomic m_active; /* This is set to true if a fault occurred */ std::atomic m_fault; @@ -261,12 +261,12 @@ class RemoteControllerTelnet : public BaseRemoteController { class RemoteControllerZmq : public BaseRemoteController { public: RemoteControllerZmq() - : m_running(false), m_fault(false), + : m_active(false), m_fault(false), m_zmqContext(1), m_endpoint("") { } RemoteControllerZmq(const std::string& endpoint) - : m_running(true), m_fault(false), + : m_active(not endpoint.empty()), m_fault(false), m_zmqContext(1), m_endpoint(endpoint), m_child_thread(&RemoteControllerZmq::process, this) { } @@ -275,8 +275,10 @@ class RemoteControllerZmq : public BaseRemoteController { RemoteControllerZmq(const RemoteControllerZmq& other) = delete; ~RemoteControllerZmq() { - m_running = false; + m_active = false; m_fault = false; + + m_zmqContext.close(); if (!m_endpoint.empty()) { m_child_thread.interrupt(); m_child_thread.join(); @@ -295,7 +297,7 @@ class RemoteControllerZmq : public BaseRemoteController { void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); void process(); - std::atomic m_running; + std::atomic m_active; /* This is set to true if a fault occurred */ std::atomic m_fault; -- cgit v1.2.3 From f8f6f43702cfb18aa7113f024f96fa2bd2b249ae Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 10 Oct 2016 11:18:51 +0200 Subject: Fix TCP output display in startup summary --- src/dabOutput/dabOutputTcp.cpp | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src') diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 343ba0f..8696bec 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -196,6 +196,8 @@ int DabOutputTcp::Open(const char* name) string address; bool success = parse_uri(name, &port, address); + uri_ = name; + if (success) { dispatcher_ = new TCPDataDispatcher(); try { -- cgit v1.2.3 From f16f9c0634693ce0a53bb269aa2d36402e51f92f Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 10 Oct 2016 11:19:15 +0200 Subject: Replace select() by poll() for TcpSocket accept --- src/TcpSocket.cpp | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) (limited to 'src') diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp index 6e7c31d..433e5c1 100644 --- a/src/TcpSocket.cpp +++ b/src/TcpSocket.cpp @@ -32,6 +32,7 @@ #include #include #include +#include using namespace std; @@ -168,17 +169,11 @@ TcpSocket TcpSocket::accept() boost::optional TcpSocket::accept(int timeout_ms) { - fd_set rfds; - struct timeval tv; - int retval; + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLIN | POLLOUT; - FD_ZERO(&rfds); - FD_SET(m_sock, &rfds); - - tv.tv_sec = 0; - tv.tv_usec = 1000ul * timeout_ms; - - retval = select(1, &rfds, NULL, NULL, &tv); + int retval = poll(fds, 1, timeout_ms); if (retval == -1) { stringstream ss; -- cgit v1.2.3 From 4576c71f10dc009ce0dd9aedbc2f81a3e1a8be0e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 10 Oct 2016 11:19:51 +0200 Subject: Fix telnet RC shutdown --- src/RemoteControl.cpp | 155 +++++++++++++++++++++++++++++++++----------------- src/RemoteControl.h | 41 +++++++------ 2 files changed, 121 insertions(+), 75 deletions(-) (limited to 'src') diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index 305334b..fe8b7cd 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -3,8 +3,10 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2016 Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. @@ -36,6 +38,13 @@ using namespace std; RemoteControllers rcs; +RemoteControllerTelnet::~RemoteControllerTelnet() +{ + m_active = false; + m_io_service.stop(); + m_child_thread.join(); +} + void RemoteControllerTelnet::restart() { m_restarter_thread = boost::thread( @@ -85,85 +94,115 @@ void RemoteControllers::set_param( // thread. void RemoteControllerTelnet::restart_thread(long) { - if (m_port) { - m_child_thread.interrupt(); - m_child_thread.join(); - } + m_active = false; + m_io_service.stop(); - m_fault = false; + m_child_thread.join(); m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0); } -void RemoteControllerTelnet::process(long) +void RemoteControllerTelnet::handle_accept( + const boost::system::error_code& boost_error, + boost::shared_ptr< boost::asio::ip::tcp::socket > socket, + boost::asio::ip::tcp::acceptor& acceptor) { - std::string m_welcome = "ODR-DabMux Remote Control CLI\n" - "Write 'help' for help.\n" - "**********\n"; - std::string m_prompt = "> "; + + const std::string welcome = "ODR-DabMux Remote Control CLI\n" + "Write 'help' for help.\n" + "**********\n"; + const std::string prompt = "> "; std::string in_message; size_t length; - try { - boost::asio::io_service io_service; - tcp::acceptor acceptor(io_service, tcp::endpoint( - boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); - - while (m_active) { - in_message = ""; + if (boost_error) + { + etiLog.level(error) << "RC: Error accepting connection"; + return; + } - tcp::socket socket(io_service); + try { + etiLog.level(info) << "RC: Accepted"; - acceptor.accept(socket); + boost::system::error_code ignored_error; - boost::system::error_code ignored_error; + boost::asio::write(*socket, boost::asio::buffer(welcome), + boost::asio::transfer_all(), + ignored_error); - boost::asio::write(socket, boost::asio::buffer(m_welcome), + while (m_active && in_message != "quit") { + boost::asio::write(*socket, boost::asio::buffer(prompt), boost::asio::transfer_all(), ignored_error); - while (m_active && in_message != "quit") { - boost::asio::write(socket, boost::asio::buffer(m_prompt), - boost::asio::transfer_all(), - ignored_error); - - in_message = ""; + in_message = ""; - boost::asio::streambuf buffer; - length = boost::asio::read_until( socket, buffer, "\n", ignored_error); + boost::asio::streambuf buffer; + length = boost::asio::read_until(*socket, buffer, "\n", ignored_error); - std::istream str(&buffer); - std::getline(str, in_message); + std::istream str(&buffer); + std::getline(str, in_message); - if (length == 0) { - std::cerr << "RC: Connection terminated" << std::endl; - break; - } + if (length == 0) { + etiLog.level(info) << "RC: Connection terminated"; + break; + } - while (in_message.length() > 0 && - (in_message[in_message.length()-1] == '\r' || - in_message[in_message.length()-1] == '\n')) { - in_message.erase(in_message.length()-1, 1); - } + while (in_message.length() > 0 && + (in_message[in_message.length()-1] == '\r' || + in_message[in_message.length()-1] == '\n')) { + in_message.erase(in_message.length()-1, 1); + } - if (in_message.length() == 0) { - continue; - } + if (in_message.length() == 0) { + continue; + } - std::cerr << "RC: Got message '" << in_message << "'" << std::endl; + etiLog.level(info) << "RC: Got message '" << in_message << "'"; - dispatch_command(socket, in_message); - } - std::cerr << "RC: Closing socket" << std::endl; - socket.close(); + dispatch_command(*socket, in_message); } + etiLog.level(info) << "RC: Closing socket"; + socket->close(); } - catch (std::exception& e) { - etiLog.level(error) << - "Remote control caught exception: " << e.what(); - m_fault = true; + catch (std::exception& e) + { + etiLog.level(error) << "Remote control caught exception: " << e.what(); + } +} + +void RemoteControllerTelnet::process(long) +{ + m_active = true; + + while (m_active) { + m_io_service.reset(); + + tcp::acceptor acceptor(m_io_service, tcp::endpoint( + boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); + + + // Add a job to start accepting connections. + boost::shared_ptr socket( + new tcp::socket(acceptor.get_io_service())); + + // Add an accept call to the service. This will prevent io_service::run() + // from returning. + etiLog.level(info) << "RC: Waiting for connection on port " << m_port; + acceptor.async_accept(*socket, + boost::bind(&RemoteControllerTelnet::handle_accept, + this, + boost::asio::placeholders::error, + socket, + boost::ref(acceptor))); + + // Process event loop. + m_io_service.run(); } + + etiLog.level(info) << "RC: Leaving"; + m_fault = true; } void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command) @@ -284,6 +323,16 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message) #if defined(HAVE_RC_ZEROMQ) +RemoteControllerZmq::~RemoteControllerZmq() { + m_active = false; + m_fault = false; + + if (!m_endpoint.empty()) { + m_child_thread.interrupt(); + m_child_thread.join(); + } +} + void RemoteControllerZmq::restart() { m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); diff --git a/src/RemoteControl.h b/src/RemoteControl.h index da6f9ea..1c830aa 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -198,25 +198,25 @@ extern RemoteControllers rcs; class RemoteControllerTelnet : public BaseRemoteController { public: RemoteControllerTelnet() - : m_active(false), m_fault(false), + : m_active(false), + m_io_service(), + m_fault(false), m_port(0) { } RemoteControllerTelnet(int port) - : m_active(port > 0), m_fault(false), - m_child_thread(&RemoteControllerTelnet::process, this, 0), - m_port(port) { } + : m_active(port > 0), + m_io_service(), + m_fault(false), + m_port(port) + { + restart(); + } + RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; - ~RemoteControllerTelnet() { - m_active = false; - m_fault = false; - if (m_port) { - m_child_thread.interrupt(); - m_child_thread.join(); - } - } + ~RemoteControllerTelnet(); virtual bool fault_detected() { return m_fault; } @@ -232,6 +232,10 @@ class RemoteControllerTelnet : public BaseRemoteController { void reply(boost::asio::ip::tcp::socket& socket, std::string message); + void handle_accept( + const boost::system::error_code& boost_error, + boost::shared_ptr< boost::asio::ip::tcp::socket > socket, + boost::asio::ip::tcp::acceptor& acceptor); std::vector tokenise_(std::string message) { std::vector all_tokens; @@ -245,6 +249,8 @@ class RemoteControllerTelnet : public BaseRemoteController { std::atomic m_active; + boost::asio::io_service m_io_service; + /* This is set to true if a fault occurred */ std::atomic m_fault; boost::thread m_restarter_thread; @@ -274,16 +280,7 @@ class RemoteControllerZmq : public BaseRemoteController { RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; RemoteControllerZmq(const RemoteControllerZmq& other) = delete; - ~RemoteControllerZmq() { - m_active = false; - m_fault = false; - - m_zmqContext.close(); - if (!m_endpoint.empty()) { - m_child_thread.interrupt(); - m_child_thread.join(); - } - } + ~RemoteControllerZmq(); virtual bool fault_detected() { return m_fault; } -- cgit v1.2.3