diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-20 12:56:09 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-04-20 12:56:09 +0200 |
commit | 6b68f5862eadbe0bd37254435619ba92ee834635 (patch) | |
tree | b6e4a8efd0e258e1c90616aa79de091c89899151 | |
parent | 4de1e587d973c9bfcc52155f778a33fa9c969c83 (diff) | |
download | dabmux-6b68f5862eadbe0bd37254435619ba92ee834635.tar.gz dabmux-6b68f5862eadbe0bd37254435619ba92ee834635.tar.bz2 dabmux-6b68f5862eadbe0bd37254435619ba92ee834635.zip |
Update RemoteControl, taking changes from ODR-DabMod
-rw-r--r-- | src/MuxElements.cpp | 10 | ||||
-rw-r--r-- | src/RemoteControl.cpp | 62 | ||||
-rw-r--r-- | src/RemoteControl.h | 14 | ||||
-rw-r--r-- | src/input/Zmq.cpp | 38 | ||||
-rw-r--r-- | src/input/Zmq.h | 2 |
5 files changed, 70 insertions, 56 deletions
diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp index c4682e4..0e12515 100644 --- a/src/MuxElements.cpp +++ b/src/MuxElements.cpp @@ -424,23 +424,23 @@ void DabComponent::set_parameter(const string& parameter, case 0: break; case -1: - ss << m_name << " short label " << + ss << m_rc_name << " short label " << fields[1] << " is not subset of label '" << fields[0] << "'"; etiLog.level(warn) << ss.str(); throw ParameterError(ss.str()); case -2: - ss << m_name << " short label " << + ss << m_rc_name << " short label " << fields[1] << " is too long (max 8 characters)"; etiLog.level(warn) << ss.str(); throw ParameterError(ss.str()); case -3: - ss << m_name << " label " << + ss << m_rc_name << " label " << fields[0] << " is too long (max 16 characters)"; etiLog.level(warn) << ss.str(); throw ParameterError(ss.str()); default: - ss << m_name << " short label definition: program error !"; + ss << m_rc_name << " short label definition: program error !"; etiLog.level(alert) << ss.str(); throw ParameterError(ss.str()); } @@ -547,7 +547,7 @@ void DabService::set_parameter(const string& parameter, const string& value) fields[0] << " is too long (max 16 characters)"; throw ParameterError(ss.str()); default: - ss << m_name << " short label definition: program error !"; + ss << m_rc_name << " short label definition: program error !"; etiLog.level(error) << ss.str(); throw ParameterError(ss.str()); } diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp index fe8b7cd..517d99e 100644 --- a/src/RemoteControl.cpp +++ b/src/RemoteControl.cpp @@ -42,12 +42,23 @@ RemoteControllerTelnet::~RemoteControllerTelnet() { m_active = false; m_io_service.stop(); - m_child_thread.join(); + + if (m_restarter_thread.joinable()) { + m_restarter_thread.join(); + } + + if (m_child_thread.joinable()) { + m_child_thread.join(); + } } void RemoteControllerTelnet::restart() { - m_restarter_thread = boost::thread( + if (m_restarter_thread.joinable()) { + m_restarter_thread.join(); + } + + m_restarter_thread = std::thread( &RemoteControllerTelnet::restart_thread, this, 0); } @@ -97,9 +108,11 @@ void RemoteControllerTelnet::restart_thread(long) m_active = false; m_io_service.stop(); - m_child_thread.join(); + if (m_child_thread.joinable()) { + m_child_thread.join(); + } - m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0); + m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0); } void RemoteControllerTelnet::handle_accept( @@ -327,15 +340,22 @@ RemoteControllerZmq::~RemoteControllerZmq() { m_active = false; m_fault = false; - if (!m_endpoint.empty()) { - m_child_thread.interrupt(); + if (m_restarter_thread.joinable()) { + m_restarter_thread.join(); + } + + if (m_child_thread.joinable()) { m_child_thread.join(); } } void RemoteControllerZmq::restart() { - m_restarter_thread = boost::thread(&RemoteControllerZmq::restart_thread, this); + if (m_restarter_thread.joinable()) { + m_restarter_thread.join(); + } + + m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this); } // This runs in a separate thread, because @@ -345,12 +365,11 @@ void RemoteControllerZmq::restart_thread() { m_active = false; - if (!m_endpoint.empty()) { - m_child_thread.interrupt(); + if (m_child_thread.joinable()) { m_child_thread.join(); } - m_child_thread = boost::thread(&RemoteControllerZmq::process, this); + m_child_thread = std::thread(&RemoteControllerZmq::process, this); } void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message) @@ -388,7 +407,6 @@ void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::str void RemoteControllerZmq::process() { // create zmq reply socket for receiving ctrl parameters - etiLog.level(info) << "Starting zmq remote control thread"; try { zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); @@ -422,11 +440,11 @@ void RemoteControllerZmq::process() std::string msg_s = ss.str(); - zmq::message_t msg(ss.str().size()); - memcpy ((void*) msg.data(), msg_s.data(), msg_s.size()); + zmq::message_t zmsg(ss.str().size()); + memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; - repSocket.send(msg, flag); + repSocket.send(zmsg, flag); } } else if (msg.size() == 2 && command == "show") { @@ -437,11 +455,11 @@ void RemoteControllerZmq::process() for (auto ¶m_val : r) { std::stringstream ss; ss << param_val[0] << ": " << param_val[1] << endl; - zmq::message_t msg(ss.str().size()); - memcpy(msg.data(), ss.str().data(), ss.str().size()); + zmq::message_t zmsg(ss.str().size()); + memcpy(zmsg.data(), ss.str().data(), ss.str().size()); int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; - repSocket.send(msg, flag); + repSocket.send(zmsg, flag); } } catch (ParameterError &e) { @@ -454,9 +472,9 @@ void RemoteControllerZmq::process() try { std::string value = rcs.get_param(module, parameter); - zmq::message_t msg(value.size()); - memcpy ((void*) msg.data(), value.data(), value.size()); - repSocket.send(msg, 0); + zmq::message_t zmsg(value.size()); + memcpy ((void*) zmsg.data(), value.data(), value.size()); + repSocket.send(zmsg, 0); } catch (ParameterError &err) { send_fail_reply(repSocket, err.what()); @@ -480,13 +498,9 @@ void RemoteControllerZmq::process() "Unsupported command. commands: list, show, get, set"); } } - - // check if thread is interrupted - boost::this_thread::interruption_point(); } repSocket.close(); } - catch (boost::thread_interrupted&) {} catch (zmq::error_t &e) { etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); } diff --git a/src/RemoteControl.h b/src/RemoteControl.h index d8b3b6b..62299b0 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -94,7 +94,7 @@ class BaseRemoteController { class RemoteControllable { public: RemoteControllable(const std::string& name) : - m_name(name) {} + m_rc_name(name) {} RemoteControllable(const RemoteControllable& other) = delete; RemoteControllable& operator=(const RemoteControllable& other) = delete; @@ -105,7 +105,7 @@ class RemoteControllable { * It might be used in the commands the user has to type, so keep * it short */ - virtual std::string get_rc_name() const { return m_name; } + virtual std::string get_rc_name() const { return m_rc_name; } /* Return a list of possible parameters that can be set */ virtual std::list<std::string> get_supported_parameters() const; @@ -126,7 +126,7 @@ class RemoteControllable { virtual const std::string get_parameter(const std::string& parameter) const = 0; protected: - std::string m_name; + std::string m_rc_name; std::list< std::vector<std::string> > m_parameters; }; @@ -254,9 +254,9 @@ class RemoteControllerTelnet : public BaseRemoteController { /* This is set to true if a fault occurred */ std::atomic<bool> m_fault; - boost::thread m_restarter_thread; + std::thread m_restarter_thread; - boost::thread m_child_thread; + std::thread m_child_thread; int m_port; }; @@ -299,12 +299,12 @@ class RemoteControllerZmq : public BaseRemoteController { /* This is set to true if a fault occurred */ std::atomic<bool> m_fault; - boost::thread m_restarter_thread; + std::thread m_restarter_thread; zmq::context_t m_zmqContext; std::string m_endpoint; - boost::thread m_child_thread; + std::thread m_child_thread; }; #endif diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 1dd49af..2e35907 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -93,7 +93,7 @@ void ZmqBase::rebind() m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str()); } catch (const zmq::error_t& err) { - etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed"; + etiLog.level(warn) << "ZMQ unbind for input " << m_rc_name << " failed"; } } @@ -105,7 +105,7 @@ void ZmqBase::rebind() if (rc < 0) { etiLog.level(warn) << "Invalid public key for input " << - m_name; + m_rc_name; INVALIDATE_KEY(m_curve_public_key); } @@ -116,7 +116,7 @@ void ZmqBase::rebind() if (rc < 0) { etiLog.level(warn) << "Invalid secret key for input " << - m_name; + m_rc_name; INVALIDATE_KEY(m_curve_secret_key); } @@ -127,7 +127,7 @@ void ZmqBase::rebind() if (rc < 0) { etiLog.level(warn) << "Invalid encoder key for input " << - m_name; + m_rc_name; INVALIDATE_KEY(m_curve_encoder_key); } @@ -154,7 +154,7 @@ void ZmqBase::rebind() } catch (const zmq::error_t& err) { std::ostringstream os; - os << "ZMQ set encoder key for input " << m_name << " failed" << + os << "ZMQ set encoder key for input " << m_rc_name << " failed" << err.what(); throw std::runtime_error(os.str()); } @@ -165,7 +165,7 @@ void ZmqBase::rebind() } catch (const zmq::error_t& err) { std::ostringstream os; - os << "ZMQ set public key for input " << m_name << " failed" << + os << "ZMQ set public key for input " << m_rc_name << " failed" << err.what(); throw std::runtime_error(os.str()); } @@ -176,7 +176,7 @@ void ZmqBase::rebind() } catch (const zmq::error_t& err) { std::ostringstream os; - os << "ZMQ set secret key for input " << m_name << " failed" << + os << "ZMQ set secret key for input " << m_rc_name << " failed" << err.what(); throw std::runtime_error(os.str()); } @@ -191,7 +191,7 @@ void ZmqBase::rebind() } catch (const zmq::error_t& err) { etiLog.level(warn) << "ZMQ disable encryption keys for input " << - m_name << " failed: " << err.what(); + m_rc_name << " failed: " << err.what(); } } @@ -202,7 +202,7 @@ void ZmqBase::rebind() } catch (const zmq::error_t& err) { std::ostringstream os; - os << "ZMQ bind for input " << m_name << " failed" << + os << "ZMQ bind for input " << m_rc_name << " failed" << err.what(); throw std::runtime_error(os.str()); } @@ -214,7 +214,7 @@ void ZmqBase::rebind() } catch (const zmq::error_t& err) { std::ostringstream os; - os << "ZMQ set socket options for input " << m_name << " failed" << + os << "ZMQ set socket options for input " << m_rc_name << " failed" << err.what(); throw std::runtime_error(os.str()); } @@ -300,7 +300,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size) m_prebuf_current--; if (m_prebuf_current == 0) etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", - m_name.c_str()); + m_rc_name.c_str()); /* During prebuffering, give a zeroed frame to the mux */ m_stats.notifyUnderrun(); @@ -313,7 +313,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size) if (m_frame_buffer.empty()) { etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", - m_name.c_str()); + m_rc_name.c_str()); // reset prebuffering m_prebuf_current = m_config.prebuffering; @@ -350,7 +350,7 @@ int ZmqMPEG::readFromSocket(size_t framesize) } catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " << - m_name << ": " << err.what(); + m_rc_name << ": " << err.what(); } /* This is the old 'one superframe per ZMQ message' format */ @@ -375,7 +375,7 @@ int ZmqMPEG::readFromSocket(size_t framesize) if (datalen == framesize) { if (m_frame_buffer.size() > m_config.buffer_size) { etiLog.level(warn) << - "inputZMQ " << m_name << + "inputZMQ " << m_rc_name << " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming frame !"; messageReceived = false; @@ -392,7 +392,7 @@ int ZmqMPEG::readFromSocket(size_t framesize) } else { etiLog.level(error) << - "inputZMQ " << m_name << + "inputZMQ " << m_rc_name << " verify bitrate: recv'd " << msg.size() << " B" << ", need " << framesize << "."; messageReceived = false; @@ -420,7 +420,7 @@ int ZmqAAC::readFromSocket(size_t framesize) catch (const zmq::error_t& err) { etiLog.level(error) << "Failed to receive AAC superframe from zmq socket " << - m_name << ": " << err.what(); + m_rc_name << ": " << err.what(); } /* This is the old 'one superframe per ZMQ message' format */ @@ -450,7 +450,7 @@ int ZmqAAC::readFromSocket(size_t framesize) if (datalen == 5*framesize) { if (m_frame_buffer.size() > m_config.buffer_size) { etiLog.level(warn) << - "inputZMQ " << m_name << + "inputZMQ " << m_rc_name << " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming superframe !"; datalen = 0; @@ -471,7 +471,7 @@ int ZmqAAC::readFromSocket(size_t framesize) } else { etiLog.level(error) << - "inputZMQ " << m_name << + "inputZMQ " << m_rc_name << " verify bitrate: recv'd " << msg.size() << " B" << ", need " << 5*framesize << "."; @@ -480,7 +480,7 @@ int ZmqAAC::readFromSocket(size_t framesize) } else { etiLog.level(error) << - "inputZMQ " << m_name << + "inputZMQ " << m_rc_name << " invalid frame received"; } diff --git a/src/input/Zmq.h b/src/input/Zmq.h index 1c2016a..eb67fe5 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -156,7 +156,7 @@ class ZmqBase : public InputBase, public RemoteControllable { m_bitrate(0), m_enable_input(true), m_config(config), - m_stats(m_name), + m_stats(name), m_prebuf_current(config.prebuffering) { RC_ADD_PARAMETER(enable, "If the input is enabled. Set to zero to empty the buffer."); |