From 66d0257b2e3a0e4bdb02b2511881863059a1e5a5 Mon Sep 17 00:00:00 2001 From: Martin Braun Date: Thu, 11 Jan 2018 18:58:06 -0800 Subject: mpm: Enable systemd watchdog and update it from MPM - Updated systemd service file - Added health status flag in shared data object - Added thread in RPC process to update watchdog Reviewed-by: Moritz Fischer --- mpm/python/usrp_hwd.py | 9 +++++++-- mpm/python/usrp_mpm/mpmtypes.py | 3 ++- mpm/python/usrp_mpm/rpc_server.py | 9 +++++++++ mpm/python/usrp_mpm/sys_utils/watchdog.py | 16 ++++++++++++---- mpm/systemd/usrp-hwd.service.in | 6 ++++++ 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/mpm/python/usrp_hwd.py b/mpm/python/usrp_hwd.py index e4f64b2f0..b4d556cfd 100755 --- a/mpm/python/usrp_hwd.py +++ b/mpm/python/usrp_hwd.py @@ -15,6 +15,7 @@ from gevent import signal from gevent.hub import BlockingSwitchOutError import usrp_mpm as mpm from usrp_mpm.mpmtypes import SharedState +from usrp_mpm.sys_utils import watchdog _PROCESSES = [] @@ -147,17 +148,21 @@ def spawn_processes(log, args): log.info("Spawning RPC process...") _PROCESSES.append( mpm.spawn_rpc_process(mpm.mpmtypes.MPM_RPC_PORT, shared, args)) + log.debug("RPC process has PID: %d", _PROCESSES[-1].pid) + if watchdog.has_watchdog(): + watchdog.transfer_control(_PROCESSES[-1].pid) log.info("Spawning discovery process...") _PROCESSES.append( mpm.spawn_discovery_process(shared, args.discovery_addr) ) + log.debug("Discovery process has PID: %d", _PROCESSES[-1].pid) log.info("Processes launched. Registering signal handlers.") signal.signal(signal.SIGTERM, kill_time) signal.signal(signal.SIGINT, kill_time) - signal.pause() + for proc in _PROCESSES: + proc.join() return True - def main(): """ Go, go, go! diff --git a/mpm/python/usrp_mpm/mpmtypes.py b/mpm/python/usrp_mpm/mpmtypes.py index 07e058a62..cf81f4f16 100644 --- a/mpm/python/usrp_mpm/mpmtypes.py +++ b/mpm/python/usrp_mpm/mpmtypes.py @@ -24,7 +24,8 @@ class SharedState(object): def __init__(self): self.lock = RLock() self.claim_status = Value(ctypes.c_bool, False, lock=self.lock) - # String with max length of 256 + self.system_ready = Value(ctypes.c_bool, False, lock=self.lock) + # String with max length of 256: self.claim_token = Array(ctypes.c_char, 256, lock=self.lock) self.dev_type = Array(ctypes.c_char, 16, lock=self.lock) self.dev_serial = Array(ctypes.c_char, 8, lock=self.lock) diff --git a/mpm/python/usrp_mpm/rpc_server.py b/mpm/python/usrp_mpm/rpc_server.py index 5e8601bbd..60de43994 100644 --- a/mpm/python/usrp_mpm/rpc_server.py +++ b/mpm/python/usrp_mpm/rpc_server.py @@ -25,6 +25,7 @@ from six import iteritems from mprpc import RPCServer from usrp_mpm.mpmlog import get_main_logger from usrp_mpm.mpmutils import to_binary_str +from usrp_mpm.sys_utils import watchdog TIMEOUT_INTERVAL = 3.0 # Seconds before claim expires TOKEN_LEN = 16 # Length of the token string @@ -81,6 +82,14 @@ class MPMServer(RPCServer): super(MPMServer, self).__init__( pack_params={'use_bin_type': True}, ) + self._state.system_ready.value = True + self.log.info("RPC server ready!") + # Optionally spawn watchdog. Note: In order for us to be able to spawn + # the task from this thread, the main process needs to hand control to + # us using watchdog.transfer_control(). + if watchdog.has_watchdog(): + self.log.info("Spawning watchdog task...") + watchdog.spawn_watchdog_task(self._state, self.log) def _init_rpc_calls(self, mgr): """ diff --git a/mpm/python/usrp_mpm/sys_utils/watchdog.py b/mpm/python/usrp_mpm/sys_utils/watchdog.py index 125a42c62..8f89471dc 100644 --- a/mpm/python/usrp_mpm/sys_utils/watchdog.py +++ b/mpm/python/usrp_mpm/sys_utils/watchdog.py @@ -23,14 +23,19 @@ def has_watchdog(): """ return bool(os.environ.get('WATCHDOG_USEC', False)) -def watchdog_task(shared_state, log): +def transfer_control(pid): + """ + Transfer control of watchdog notifications to new PID. + """ + daemon.notify("MAINPID={:d}".format(int(pid))) + +def _watchdog_task(shared_state, log): """ Continuously ping the watchdog to tell him that we're still alive. This will keep running until the parent thread dies, or shared_state.system_ready gets set to False by someone. """ - log.info("launching task") watchdog_timeout = \ float(os.environ.get( 'WATCHDOG_USEC', @@ -40,9 +45,12 @@ def watchdog_task(shared_state, log): daemon.notify("READY=1") log.info("READY=1, interval %f", watchdog_interval) while shared_state.system_ready.value: + # Sleep first, then ping, that avoids the case where transfer_control() + # is not yet complete before we call this for the first time, which + # would lead in error messages popping up in the systemd journal. + time.sleep(watchdog_interval) log.trace("Pinging watchdog....") daemon.notify("WATCHDOG=1") - time.sleep(watchdog_interval) log.error("Terminating watchdog thread!") return @@ -53,7 +61,7 @@ def spawn_watchdog_task(shared_state, log): outlive the main thread. """ task = threading.Thread( - target=watchdog_task, + target=_watchdog_task, args=[shared_state, log], name="MPMWatchdogTask", daemon=True, diff --git a/mpm/systemd/usrp-hwd.service.in b/mpm/systemd/usrp-hwd.service.in index 91ba942a2..4da4a31dc 100644 --- a/mpm/systemd/usrp-hwd.service.in +++ b/mpm/systemd/usrp-hwd.service.in @@ -3,6 +3,12 @@ Description=USRP Hardware Daemon (MPM) [Service] ExecStart=@CMAKE_INSTALL_PREFIX@/bin/usrp_hwd.py +WatchdogSec=30 +Type=notify +Restart=on-failure +StartLimitInterval=2min +StartLimitBurst=4 +StartLimitAction=none [Install] WantedBy=multi-user.target -- cgit v1.2.3