aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/RemoteControl.cpp154
-rw-r--r--src/RemoteControl.h47
-rw-r--r--src/TcpSocket.cpp23
-rw-r--r--src/TcpSocket.h3
-rw-r--r--src/dabOutput/dabOutputTcp.cpp9
-rw-r--r--src/fig/FIGCarousel.cpp28
-rw-r--r--src/fig/FIGCarousel.h5
7 files changed, 164 insertions, 105 deletions
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index 31e63f2..fe8b7cd 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -38,6 +38,13 @@ using namespace std;
RemoteControllers rcs;
+RemoteControllerTelnet::~RemoteControllerTelnet()
+{
+ m_active = false;
+ m_io_service.stop();
+ m_child_thread.join();
+}
+
void RemoteControllerTelnet::restart()
{
m_restarter_thread = boost::thread(
@@ -87,84 +94,115 @@ void RemoteControllers::set_param(
// thread.
void RemoteControllerTelnet::restart_thread(long)
{
- m_running = false;
+ m_active = false;
+ m_io_service.stop();
- if (m_port) {
- m_child_thread.interrupt();
- m_child_thread.join();
- }
+ m_child_thread.join();
m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0);
}
-void RemoteControllerTelnet::process(long)
+void RemoteControllerTelnet::handle_accept(
+ const boost::system::error_code& boost_error,
+ boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
+ boost::asio::ip::tcp::acceptor& acceptor)
{
- std::string m_welcome = "ODR-DabMux Remote Control CLI\n"
- "Write 'help' for help.\n"
- "**********\n";
- std::string m_prompt = "> ";
+
+ const std::string welcome = "ODR-DabMux Remote Control CLI\n"
+ "Write 'help' for help.\n"
+ "**********\n";
+ const std::string prompt = "> ";
std::string in_message;
size_t length;
- try {
- boost::asio::io_service io_service;
- tcp::acceptor acceptor(io_service, tcp::endpoint(
- boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
-
- while (m_running) {
- in_message = "";
+ if (boost_error)
+ {
+ etiLog.level(error) << "RC: Error accepting connection";
+ return;
+ }
- tcp::socket socket(io_service);
+ try {
+ etiLog.level(info) << "RC: Accepted";
- acceptor.accept(socket);
+ boost::system::error_code ignored_error;
- boost::system::error_code ignored_error;
+ boost::asio::write(*socket, boost::asio::buffer(welcome),
+ boost::asio::transfer_all(),
+ ignored_error);
- boost::asio::write(socket, boost::asio::buffer(m_welcome),
+ while (m_active && in_message != "quit") {
+ boost::asio::write(*socket, boost::asio::buffer(prompt),
boost::asio::transfer_all(),
ignored_error);
- while (m_running && in_message != "quit") {
- boost::asio::write(socket, boost::asio::buffer(m_prompt),
- boost::asio::transfer_all(),
- ignored_error);
-
- in_message = "";
+ in_message = "";
- boost::asio::streambuf buffer;
- length = boost::asio::read_until( socket, buffer, "\n", ignored_error);
+ boost::asio::streambuf buffer;
+ length = boost::asio::read_until(*socket, buffer, "\n", ignored_error);
- std::istream str(&buffer);
- std::getline(str, in_message);
+ std::istream str(&buffer);
+ std::getline(str, in_message);
- if (length == 0) {
- std::cerr << "RC: Connection terminated" << std::endl;
- break;
- }
+ if (length == 0) {
+ etiLog.level(info) << "RC: Connection terminated";
+ break;
+ }
- while (in_message.length() > 0 &&
- (in_message[in_message.length()-1] == '\r' ||
- in_message[in_message.length()-1] == '\n')) {
- in_message.erase(in_message.length()-1, 1);
- }
+ while (in_message.length() > 0 &&
+ (in_message[in_message.length()-1] == '\r' ||
+ in_message[in_message.length()-1] == '\n')) {
+ in_message.erase(in_message.length()-1, 1);
+ }
- if (in_message.length() == 0) {
- continue;
- }
+ if (in_message.length() == 0) {
+ continue;
+ }
- std::cerr << "RC: Got message '" << in_message << "'" << std::endl;
+ etiLog.level(info) << "RC: Got message '" << in_message << "'";
- dispatch_command(socket, in_message);
- }
- std::cerr << "RC: Closing socket" << std::endl;
- socket.close();
+ dispatch_command(*socket, in_message);
}
+ etiLog.level(info) << "RC: Closing socket";
+ socket->close();
}
- catch (std::exception& e) {
- std::cerr << "Remote control caught exception: " << e.what() << std::endl;
- m_fault = true;
+ catch (std::exception& e)
+ {
+ etiLog.level(error) << "Remote control caught exception: " << e.what();
+ }
+}
+
+void RemoteControllerTelnet::process(long)
+{
+ m_active = true;
+
+ while (m_active) {
+ m_io_service.reset();
+
+ tcp::acceptor acceptor(m_io_service, tcp::endpoint(
+ boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
+
+
+ // Add a job to start accepting connections.
+ boost::shared_ptr<tcp::socket> socket(
+ new tcp::socket(acceptor.get_io_service()));
+
+ // Add an accept call to the service. This will prevent io_service::run()
+ // from returning.
+ etiLog.level(info) << "RC: Waiting for connection on port " << m_port;
+ acceptor.async_accept(*socket,
+ boost::bind(&RemoteControllerTelnet::handle_accept,
+ this,
+ boost::asio::placeholders::error,
+ socket,
+ boost::ref(acceptor)));
+
+ // Process event loop.
+ m_io_service.run();
}
+
+ etiLog.level(info) << "RC: Leaving";
+ m_fault = true;
}
void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)
@@ -285,6 +323,16 @@ void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
#if defined(HAVE_RC_ZEROMQ)
+RemoteControllerZmq::~RemoteControllerZmq() {
+ m_active = false;
+ m_fault = false;
+
+ if (!m_endpoint.empty()) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
+}
+
void RemoteControllerZmq::restart()
{
m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this);
@@ -295,7 +343,7 @@ void RemoteControllerZmq::restart()
// thread.
void RemoteControllerZmq::restart_thread()
{
- m_running = false;
+ m_active = false;
if (!m_endpoint.empty()) {
m_child_thread.interrupt();
@@ -354,7 +402,7 @@ void RemoteControllerZmq::process()
// create pollitem that polls the ZMQ sockets
zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
- for (;;) {
+ while (m_active) {
zmq::poll(pollItems, 1, 100);
std::vector<std::string> msg;
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index fe8ac42..d8b3b6b 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -199,25 +199,25 @@ extern RemoteControllers rcs;
class RemoteControllerTelnet : public BaseRemoteController {
public:
RemoteControllerTelnet()
- : m_running(false), m_fault(false),
+ : m_active(false),
+ m_io_service(),
+ m_fault(false),
m_port(0) { }
RemoteControllerTelnet(int port)
- : m_running(true), m_fault(false),
- m_child_thread(&RemoteControllerTelnet::process, this, 0),
- m_port(port) { }
+ : m_active(port > 0),
+ m_io_service(),
+ m_fault(false),
+ m_port(port)
+ {
+ restart();
+ }
+
RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete;
RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete;
- ~RemoteControllerTelnet() {
- m_running = false;
- m_fault = false;
- if (m_port) {
- m_child_thread.interrupt();
- m_child_thread.join();
- }
- }
+ ~RemoteControllerTelnet();
virtual bool fault_detected() { return m_fault; }
@@ -233,6 +233,10 @@ class RemoteControllerTelnet : public BaseRemoteController {
void reply(boost::asio::ip::tcp::socket& socket, std::string message);
+ void handle_accept(
+ const boost::system::error_code& boost_error,
+ boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
+ boost::asio::ip::tcp::acceptor& acceptor);
std::vector<std::string> tokenise_(std::string message) {
std::vector<std::string> all_tokens;
@@ -244,7 +248,9 @@ class RemoteControllerTelnet : public BaseRemoteController {
return all_tokens;
}
- std::atomic<bool> m_running;
+ std::atomic<bool> m_active;
+
+ boost::asio::io_service m_io_service;
/* This is set to true if a fault occurred */
std::atomic<bool> m_fault;
@@ -262,12 +268,12 @@ class RemoteControllerTelnet : public BaseRemoteController {
class RemoteControllerZmq : public BaseRemoteController {
public:
RemoteControllerZmq()
- : m_running(false), m_fault(false),
+ : m_active(false), m_fault(false),
m_zmqContext(1),
m_endpoint("") { }
RemoteControllerZmq(const std::string& endpoint)
- : m_running(true), m_fault(false),
+ : m_active(not endpoint.empty()), m_fault(false),
m_zmqContext(1),
m_endpoint(endpoint),
m_child_thread(&RemoteControllerZmq::process, this) { }
@@ -275,14 +281,7 @@ class RemoteControllerZmq : public BaseRemoteController {
RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete;
RemoteControllerZmq(const RemoteControllerZmq& other) = delete;
- ~RemoteControllerZmq() {
- m_running = false;
- m_fault = false;
- if (!m_endpoint.empty()) {
- m_child_thread.interrupt();
- m_child_thread.join();
- }
- }
+ ~RemoteControllerZmq();
virtual bool fault_detected() { return m_fault; }
@@ -296,7 +295,7 @@ class RemoteControllerZmq : public BaseRemoteController {
void send_fail_reply(zmq::socket_t &pSocket, const std::string &error);
void process();
- std::atomic<bool> m_running;
+ std::atomic<bool> m_active;
/* This is set to true if a fault occurred */
std::atomic<bool> m_fault;
diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp
index 13efece..433e5c1 100644
--- a/src/TcpSocket.cpp
+++ b/src/TcpSocket.cpp
@@ -32,6 +32,7 @@
#include <string.h>
#include <signal.h>
#include <stdint.h>
+#include <poll.h>
using namespace std;
@@ -166,6 +167,28 @@ TcpSocket TcpSocket::accept()
}
}
+boost::optional<TcpSocket> TcpSocket::accept(int timeout_ms)
+{
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN | POLLOUT;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ stringstream ss;
+ ss << "TCP Socket accept error: " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+ else if (retval) {
+ return accept();
+ }
+ else {
+ return boost::none;
+ }
+}
+
+
InetAddress TcpSocket::getOwnAddress() const
{
return m_own_address;
diff --git a/src/TcpSocket.h b/src/TcpSocket.h
index f1354a7..5a4a808 100644
--- a/src/TcpSocket.h
+++ b/src/TcpSocket.h
@@ -46,6 +46,8 @@
#include <iostream>
#include <string>
+#include <boost/optional.hpp>
+
/**
* This class represents a TCP socket.
*/
@@ -84,6 +86,7 @@ class TcpSocket
void listen(void);
TcpSocket accept(void);
+ boost::optional<TcpSocket> accept(int timeout_ms);
/** Retrieve address this socket is bound to */
InetAddress getOwnAddress() const;
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp
index 2c5a067..8696bec 100644
--- a/src/dabOutput/dabOutputTcp.cpp
+++ b/src/dabOutput/dabOutputTcp.cpp
@@ -128,9 +128,14 @@ class TCPDataDispatcher
void process(long) {
m_listener_socket.listen();
+ const int timeout_ms = 1000;
+
while (m_running) {
// Add a new TCPConnection to the list, constructing it from the client socket
- m_connections.emplace(m_connections.begin(), m_listener_socket.accept());
+ auto optional_sock = m_listener_socket.accept(timeout_ms);
+ if (optional_sock) {
+ m_connections.emplace(m_connections.begin(), std::move(*optional_sock));
+ }
}
}
@@ -191,6 +196,8 @@ int DabOutputTcp::Open(const char* name)
string address;
bool success = parse_uri(name, &port, address);
+ uri_ = name;
+
if (success) {
dispatcher_ = new TCPDataDispatcher();
try {
diff --git a/src/fig/FIGCarousel.cpp b/src/fig/FIGCarousel.cpp
index bd2bf51..ac2a80b 100644
--- a/src/fig/FIGCarousel.cpp
+++ b/src/fig/FIGCarousel.cpp
@@ -113,11 +113,11 @@ FIGCarousel::FIGCarousel(std::shared_ptr<dabEnsemble> ensemble) :
void FIGCarousel::load_and_allocate(IFIG& fig, FIBAllocation fib)
{
- int type = fig.figtype();
- int extension = fig.figextension();
-
- m_figs_available[std::make_pair(type, extension)] = &fig;
- allocate_fig_to_fib(type, extension, fib);
+ FIGCarouselElement el;
+ el.fig = &fig;
+ el.deadline = 0;
+ el.increase_deadline();
+ m_fibs[fib].push_back(el);
}
void FIGCarousel::update(unsigned long currentFrame)
@@ -125,24 +125,6 @@ void FIGCarousel::update(unsigned long currentFrame)
m_rti.currentFrame = currentFrame;
}
-void FIGCarousel::allocate_fig_to_fib(int figtype, int extension, FIBAllocation fib)
-{
- auto fig = m_figs_available.find(std::make_pair(figtype, extension));
-
- if (fig != m_figs_available.end()) {
- FIGCarouselElement el;
- el.fig = fig->second;
- el.deadline = 0;
- el.increase_deadline();
- m_fibs[fib].push_back(el);
- }
- else {
- std::stringstream ss;
- ss << "No FIG " << figtype << "/" << extension << " available";
- throw std::runtime_error(ss.str());
- }
-}
-
void dumpfib(const uint8_t *buf, size_t bufsize) {
std::cerr << "FIB ";
for (size_t i = 0; i < bufsize; i++) {
diff --git a/src/fig/FIGCarousel.h b/src/fig/FIGCarousel.h
index be0b23f..f52f266 100644
--- a/src/fig/FIGCarousel.h
+++ b/src/fig/FIGCarousel.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2015
+ Copyright (C) 2016
Matthias P. Braendli, matthias.braendli@mpb.li
Implementation of the FIG carousel to schedule the FIGs into the
@@ -62,8 +62,6 @@ class FIGCarousel {
void update(unsigned long currentFrame);
- void allocate_fig_to_fib(int figtype, int extension, FIBAllocation fib);
-
/* Write all FIBs to the buffer, including correct padding and crc.
* Returns number of bytes written.
*
@@ -81,7 +79,6 @@ class FIGCarousel {
void load_and_allocate(IFIG& fig, FIBAllocation fib);
FIGRuntimeInformation m_rti;
- std::map<std::pair<int, int>, IFIG*> m_figs_available;
// Some FIGs can be mapped to a specific FIB or to FIB_ANY
std::map<FIBAllocation, std::list<FIGCarouselElement> > m_fibs;