summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-20 12:56:09 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-04-20 12:56:09 +0200
commit6b68f5862eadbe0bd37254435619ba92ee834635 (patch)
treeb6e4a8efd0e258e1c90616aa79de091c89899151
parent4de1e587d973c9bfcc52155f778a33fa9c969c83 (diff)
downloaddabmux-6b68f5862eadbe0bd37254435619ba92ee834635.tar.gz
dabmux-6b68f5862eadbe0bd37254435619ba92ee834635.tar.bz2
dabmux-6b68f5862eadbe0bd37254435619ba92ee834635.zip
Update RemoteControl, taking changes from ODR-DabMod
-rw-r--r--src/MuxElements.cpp10
-rw-r--r--src/RemoteControl.cpp62
-rw-r--r--src/RemoteControl.h14
-rw-r--r--src/input/Zmq.cpp38
-rw-r--r--src/input/Zmq.h2
5 files changed, 70 insertions, 56 deletions
diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp
index c4682e4..0e12515 100644
--- a/src/MuxElements.cpp
+++ b/src/MuxElements.cpp
@@ -424,23 +424,23 @@ void DabComponent::set_parameter(const string& parameter,
case 0:
break;
case -1:
- ss << m_name << " short label " <<
+ ss << m_rc_name << " short label " <<
fields[1] << " is not subset of label '" <<
fields[0] << "'";
etiLog.level(warn) << ss.str();
throw ParameterError(ss.str());
case -2:
- ss << m_name << " short label " <<
+ ss << m_rc_name << " short label " <<
fields[1] << " is too long (max 8 characters)";
etiLog.level(warn) << ss.str();
throw ParameterError(ss.str());
case -3:
- ss << m_name << " label " <<
+ ss << m_rc_name << " label " <<
fields[0] << " is too long (max 16 characters)";
etiLog.level(warn) << ss.str();
throw ParameterError(ss.str());
default:
- ss << m_name << " short label definition: program error !";
+ ss << m_rc_name << " short label definition: program error !";
etiLog.level(alert) << ss.str();
throw ParameterError(ss.str());
}
@@ -547,7 +547,7 @@ void DabService::set_parameter(const string& parameter, const string& value)
fields[0] << " is too long (max 16 characters)";
throw ParameterError(ss.str());
default:
- ss << m_name << " short label definition: program error !";
+ ss << m_rc_name << " short label definition: program error !";
etiLog.level(error) << ss.str();
throw ParameterError(ss.str());
}
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index fe8b7cd..517d99e 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -42,12 +42,23 @@ RemoteControllerTelnet::~RemoteControllerTelnet()
{
m_active = false;
m_io_service.stop();
- m_child_thread.join();
+
+ if (m_restarter_thread.joinable()) {
+ m_restarter_thread.join();
+ }
+
+ if (m_child_thread.joinable()) {
+ m_child_thread.join();
+ }
}
void RemoteControllerTelnet::restart()
{
- m_restarter_thread = boost::thread(
+ if (m_restarter_thread.joinable()) {
+ m_restarter_thread.join();
+ }
+
+ m_restarter_thread = std::thread(
&RemoteControllerTelnet::restart_thread,
this, 0);
}
@@ -97,9 +108,11 @@ void RemoteControllerTelnet::restart_thread(long)
m_active = false;
m_io_service.stop();
- m_child_thread.join();
+ if (m_child_thread.joinable()) {
+ m_child_thread.join();
+ }
- m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0);
+ m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0);
}
void RemoteControllerTelnet::handle_accept(
@@ -327,15 +340,22 @@ RemoteControllerZmq::~RemoteControllerZmq() {
m_active = false;
m_fault = false;
- if (!m_endpoint.empty()) {
- m_child_thread.interrupt();
+ if (m_restarter_thread.joinable()) {
+ m_restarter_thread.join();
+ }
+
+ if (m_child_thread.joinable()) {
m_child_thread.join();
}
}
void RemoteControllerZmq::restart()
{
- m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this);
+ if (m_restarter_thread.joinable()) {
+ m_restarter_thread.join();
+ }
+
+ m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this);
}
// This runs in a separate thread, because
@@ -345,12 +365,11 @@ void RemoteControllerZmq::restart_thread()
{
m_active = false;
- if (!m_endpoint.empty()) {
- m_child_thread.interrupt();
+ if (m_child_thread.joinable()) {
m_child_thread.join();
}
- m_child_thread = boost::thread(&RemoteControllerZmq::process, this);
+ m_child_thread = std::thread(&RemoteControllerZmq::process, this);
}
void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message)
@@ -388,7 +407,6 @@ void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::str
void RemoteControllerZmq::process()
{
// create zmq reply socket for receiving ctrl parameters
- etiLog.level(info) << "Starting zmq remote control thread";
try {
zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
@@ -422,11 +440,11 @@ void RemoteControllerZmq::process()
std::string msg_s = ss.str();
- zmq::message_t msg(ss.str().size());
- memcpy ((void*) msg.data(), msg_s.data(), msg_s.size());
+ zmq::message_t zmsg(ss.str().size());
+ memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size());
int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0;
- repSocket.send(msg, flag);
+ repSocket.send(zmsg, flag);
}
}
else if (msg.size() == 2 && command == "show") {
@@ -437,11 +455,11 @@ void RemoteControllerZmq::process()
for (auto &param_val : r) {
std::stringstream ss;
ss << param_val[0] << ": " << param_val[1] << endl;
- zmq::message_t msg(ss.str().size());
- memcpy(msg.data(), ss.str().data(), ss.str().size());
+ zmq::message_t zmsg(ss.str().size());
+ memcpy(zmsg.data(), ss.str().data(), ss.str().size());
int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0;
- repSocket.send(msg, flag);
+ repSocket.send(zmsg, flag);
}
}
catch (ParameterError &e) {
@@ -454,9 +472,9 @@ void RemoteControllerZmq::process()
try {
std::string value = rcs.get_param(module, parameter);
- zmq::message_t msg(value.size());
- memcpy ((void*) msg.data(), value.data(), value.size());
- repSocket.send(msg, 0);
+ zmq::message_t zmsg(value.size());
+ memcpy ((void*) zmsg.data(), value.data(), value.size());
+ repSocket.send(zmsg, 0);
}
catch (ParameterError &err) {
send_fail_reply(repSocket, err.what());
@@ -480,13 +498,9 @@ void RemoteControllerZmq::process()
"Unsupported command. commands: list, show, get, set");
}
}
-
- // check if thread is interrupted
- boost::this_thread::interruption_point();
}
repSocket.close();
}
- catch (boost::thread_interrupted&) {}
catch (zmq::error_t &e) {
etiLog.level(error) << "ZMQ RC error: " << std::string(e.what());
}
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index d8b3b6b..62299b0 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -94,7 +94,7 @@ class BaseRemoteController {
class RemoteControllable {
public:
RemoteControllable(const std::string& name) :
- m_name(name) {}
+ m_rc_name(name) {}
RemoteControllable(const RemoteControllable& other) = delete;
RemoteControllable& operator=(const RemoteControllable& other) = delete;
@@ -105,7 +105,7 @@ class RemoteControllable {
* It might be used in the commands the user has to type, so keep
* it short
*/
- virtual std::string get_rc_name() const { return m_name; }
+ virtual std::string get_rc_name() const { return m_rc_name; }
/* Return a list of possible parameters that can be set */
virtual std::list<std::string> get_supported_parameters() const;
@@ -126,7 +126,7 @@ class RemoteControllable {
virtual const std::string get_parameter(const std::string& parameter) const = 0;
protected:
- std::string m_name;
+ std::string m_rc_name;
std::list< std::vector<std::string> > m_parameters;
};
@@ -254,9 +254,9 @@ class RemoteControllerTelnet : public BaseRemoteController {
/* This is set to true if a fault occurred */
std::atomic<bool> m_fault;
- boost::thread m_restarter_thread;
+ std::thread m_restarter_thread;
- boost::thread m_child_thread;
+ std::thread m_child_thread;
int m_port;
};
@@ -299,12 +299,12 @@ class RemoteControllerZmq : public BaseRemoteController {
/* This is set to true if a fault occurred */
std::atomic<bool> m_fault;
- boost::thread m_restarter_thread;
+ std::thread m_restarter_thread;
zmq::context_t m_zmqContext;
std::string m_endpoint;
- boost::thread m_child_thread;
+ std::thread m_child_thread;
};
#endif
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
index 1dd49af..2e35907 100644
--- a/src/input/Zmq.cpp
+++ b/src/input/Zmq.cpp
@@ -93,7 +93,7 @@ void ZmqBase::rebind()
m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str());
}
catch (const zmq::error_t& err) {
- etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed";
+ etiLog.level(warn) << "ZMQ unbind for input " << m_rc_name << " failed";
}
}
@@ -105,7 +105,7 @@ void ZmqBase::rebind()
if (rc < 0) {
etiLog.level(warn) << "Invalid public key for input " <<
- m_name;
+ m_rc_name;
INVALIDATE_KEY(m_curve_public_key);
}
@@ -116,7 +116,7 @@ void ZmqBase::rebind()
if (rc < 0) {
etiLog.level(warn) << "Invalid secret key for input " <<
- m_name;
+ m_rc_name;
INVALIDATE_KEY(m_curve_secret_key);
}
@@ -127,7 +127,7 @@ void ZmqBase::rebind()
if (rc < 0) {
etiLog.level(warn) << "Invalid encoder key for input " <<
- m_name;
+ m_rc_name;
INVALIDATE_KEY(m_curve_encoder_key);
}
@@ -154,7 +154,7 @@ void ZmqBase::rebind()
}
catch (const zmq::error_t& err) {
std::ostringstream os;
- os << "ZMQ set encoder key for input " << m_name << " failed" <<
+ os << "ZMQ set encoder key for input " << m_rc_name << " failed" <<
err.what();
throw std::runtime_error(os.str());
}
@@ -165,7 +165,7 @@ void ZmqBase::rebind()
}
catch (const zmq::error_t& err) {
std::ostringstream os;
- os << "ZMQ set public key for input " << m_name << " failed" <<
+ os << "ZMQ set public key for input " << m_rc_name << " failed" <<
err.what();
throw std::runtime_error(os.str());
}
@@ -176,7 +176,7 @@ void ZmqBase::rebind()
}
catch (const zmq::error_t& err) {
std::ostringstream os;
- os << "ZMQ set secret key for input " << m_name << " failed" <<
+ os << "ZMQ set secret key for input " << m_rc_name << " failed" <<
err.what();
throw std::runtime_error(os.str());
}
@@ -191,7 +191,7 @@ void ZmqBase::rebind()
}
catch (const zmq::error_t& err) {
etiLog.level(warn) << "ZMQ disable encryption keys for input " <<
- m_name << " failed: " << err.what();
+ m_rc_name << " failed: " << err.what();
}
}
@@ -202,7 +202,7 @@ void ZmqBase::rebind()
}
catch (const zmq::error_t& err) {
std::ostringstream os;
- os << "ZMQ bind for input " << m_name << " failed" <<
+ os << "ZMQ bind for input " << m_rc_name << " failed" <<
err.what();
throw std::runtime_error(os.str());
}
@@ -214,7 +214,7 @@ void ZmqBase::rebind()
}
catch (const zmq::error_t& err) {
std::ostringstream os;
- os << "ZMQ set socket options for input " << m_name << " failed" <<
+ os << "ZMQ set socket options for input " << m_rc_name << " failed" <<
err.what();
throw std::runtime_error(os.str());
}
@@ -300,7 +300,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
m_prebuf_current--;
if (m_prebuf_current == 0)
etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
- m_name.c_str());
+ m_rc_name.c_str());
/* During prebuffering, give a zeroed frame to the mux */
m_stats.notifyUnderrun();
@@ -313,7 +313,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
if (m_frame_buffer.empty()) {
etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
- m_name.c_str());
+ m_rc_name.c_str());
// reset prebuffering
m_prebuf_current = m_config.prebuffering;
@@ -350,7 +350,7 @@ int ZmqMPEG::readFromSocket(size_t framesize)
}
catch (const zmq::error_t& err) {
etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " <<
- m_name << ": " << err.what();
+ m_rc_name << ": " << err.what();
}
/* This is the old 'one superframe per ZMQ message' format */
@@ -375,7 +375,7 @@ int ZmqMPEG::readFromSocket(size_t framesize)
if (datalen == framesize) {
if (m_frame_buffer.size() > m_config.buffer_size) {
etiLog.level(warn) <<
- "inputZMQ " << m_name <<
+ "inputZMQ " << m_rc_name <<
" buffer full (" << m_frame_buffer.size() << "),"
" dropping incoming frame !";
messageReceived = false;
@@ -392,7 +392,7 @@ int ZmqMPEG::readFromSocket(size_t framesize)
}
else {
etiLog.level(error) <<
- "inputZMQ " << m_name <<
+ "inputZMQ " << m_rc_name <<
" verify bitrate: recv'd " << msg.size() << " B" <<
", need " << framesize << ".";
messageReceived = false;
@@ -420,7 +420,7 @@ int ZmqAAC::readFromSocket(size_t framesize)
catch (const zmq::error_t& err) {
etiLog.level(error) <<
"Failed to receive AAC superframe from zmq socket " <<
- m_name << ": " << err.what();
+ m_rc_name << ": " << err.what();
}
/* This is the old 'one superframe per ZMQ message' format */
@@ -450,7 +450,7 @@ int ZmqAAC::readFromSocket(size_t framesize)
if (datalen == 5*framesize) {
if (m_frame_buffer.size() > m_config.buffer_size) {
etiLog.level(warn) <<
- "inputZMQ " << m_name <<
+ "inputZMQ " << m_rc_name <<
" buffer full (" << m_frame_buffer.size() << "),"
" dropping incoming superframe !";
datalen = 0;
@@ -471,7 +471,7 @@ int ZmqAAC::readFromSocket(size_t framesize)
}
else {
etiLog.level(error) <<
- "inputZMQ " << m_name <<
+ "inputZMQ " << m_rc_name <<
" verify bitrate: recv'd " << msg.size() << " B" <<
", need " << 5*framesize << ".";
@@ -480,7 +480,7 @@ int ZmqAAC::readFromSocket(size_t framesize)
}
else {
etiLog.level(error) <<
- "inputZMQ " << m_name <<
+ "inputZMQ " << m_rc_name <<
" invalid frame received";
}
diff --git a/src/input/Zmq.h b/src/input/Zmq.h
index 1c2016a..eb67fe5 100644
--- a/src/input/Zmq.h
+++ b/src/input/Zmq.h
@@ -156,7 +156,7 @@ class ZmqBase : public InputBase, public RemoteControllable {
m_bitrate(0),
m_enable_input(true),
m_config(config),
- m_stats(m_name),
+ m_stats(name),
m_prebuf_current(config.prebuffering) {
RC_ADD_PARAMETER(enable,
"If the input is enabled. Set to zero to empty the buffer.");