From 66d0257b2e3a0e4bdb02b2511881863059a1e5a5 Mon Sep 17 00:00:00 2001
From: Martin Braun <martin.braun@ettus.com>
Date: Thu, 11 Jan 2018 18:58:06 -0800
Subject: mpm: Enable systemd watchdog and update it from MPM

- Updated systemd service file
- Added health status flag in shared data object
- Added thread in RPC process to update watchdog

Reviewed-by: Moritz Fischer <moritz.fischer@ettus.com>
---
 mpm/python/usrp_hwd.py                    |  9 +++++++--
 mpm/python/usrp_mpm/mpmtypes.py           |  3 ++-
 mpm/python/usrp_mpm/rpc_server.py         |  9 +++++++++
 mpm/python/usrp_mpm/sys_utils/watchdog.py | 16 ++++++++++++----
 4 files changed, 30 insertions(+), 7 deletions(-)

(limited to 'mpm/python')

diff --git a/mpm/python/usrp_hwd.py b/mpm/python/usrp_hwd.py
index e4f64b2f0..b4d556cfd 100755
--- a/mpm/python/usrp_hwd.py
+++ b/mpm/python/usrp_hwd.py
@@ -15,6 +15,7 @@ from gevent import signal
 from gevent.hub import BlockingSwitchOutError
 import usrp_mpm as mpm
 from usrp_mpm.mpmtypes import SharedState
+from usrp_mpm.sys_utils import watchdog
 
 _PROCESSES = []
 
@@ -147,17 +148,21 @@ def spawn_processes(log, args):
     log.info("Spawning RPC process...")
     _PROCESSES.append(
         mpm.spawn_rpc_process(mpm.mpmtypes.MPM_RPC_PORT, shared, args))
+    log.debug("RPC process has PID: %d", _PROCESSES[-1].pid)
+    if watchdog.has_watchdog():
+        watchdog.transfer_control(_PROCESSES[-1].pid)
     log.info("Spawning discovery process...")
     _PROCESSES.append(
         mpm.spawn_discovery_process(shared, args.discovery_addr)
     )
+    log.debug("Discovery process has PID: %d", _PROCESSES[-1].pid)
     log.info("Processes launched. Registering signal handlers.")
     signal.signal(signal.SIGTERM, kill_time)
     signal.signal(signal.SIGINT, kill_time)
-    signal.pause()
+    for proc in _PROCESSES:
+        proc.join()
     return True
 
-
 def main():
     """
     Go, go, go!
diff --git a/mpm/python/usrp_mpm/mpmtypes.py b/mpm/python/usrp_mpm/mpmtypes.py
index 07e058a62..cf81f4f16 100644
--- a/mpm/python/usrp_mpm/mpmtypes.py
+++ b/mpm/python/usrp_mpm/mpmtypes.py
@@ -24,7 +24,8 @@ class SharedState(object):
     def __init__(self):
         self.lock = RLock()
         self.claim_status = Value(ctypes.c_bool, False, lock=self.lock)
-        # String with max length of 256
+        self.system_ready = Value(ctypes.c_bool, False, lock=self.lock)
+        # String with max length of 256:
         self.claim_token = Array(ctypes.c_char, 256, lock=self.lock)
         self.dev_type = Array(ctypes.c_char, 16, lock=self.lock)
         self.dev_serial = Array(ctypes.c_char, 8, lock=self.lock)
diff --git a/mpm/python/usrp_mpm/rpc_server.py b/mpm/python/usrp_mpm/rpc_server.py
index 5e8601bbd..60de43994 100644
--- a/mpm/python/usrp_mpm/rpc_server.py
+++ b/mpm/python/usrp_mpm/rpc_server.py
@@ -25,6 +25,7 @@ from six import iteritems
 from mprpc import RPCServer
 from usrp_mpm.mpmlog import get_main_logger
 from usrp_mpm.mpmutils import to_binary_str
+from usrp_mpm.sys_utils import watchdog
 
 TIMEOUT_INTERVAL = 3.0 # Seconds before claim expires
 TOKEN_LEN = 16 # Length of the token string
@@ -81,6 +82,14 @@ class MPMServer(RPCServer):
         super(MPMServer, self).__init__(
             pack_params={'use_bin_type': True},
         )
+        self._state.system_ready.value = True
+        self.log.info("RPC server ready!")
+        # Optionally spawn watchdog. Note: In order for us to be able to spawn
+        # the task from this thread, the main process needs to hand control to
+        # us using watchdog.transfer_control().
+        if watchdog.has_watchdog():
+            self.log.info("Spawning watchdog task...")
+            watchdog.spawn_watchdog_task(self._state, self.log)
 
     def _init_rpc_calls(self, mgr):
         """
diff --git a/mpm/python/usrp_mpm/sys_utils/watchdog.py b/mpm/python/usrp_mpm/sys_utils/watchdog.py
index 125a42c62..8f89471dc 100644
--- a/mpm/python/usrp_mpm/sys_utils/watchdog.py
+++ b/mpm/python/usrp_mpm/sys_utils/watchdog.py
@@ -23,14 +23,19 @@ def has_watchdog():
     """
     return bool(os.environ.get('WATCHDOG_USEC', False))
 
-def watchdog_task(shared_state, log):
+def transfer_control(pid):
+    """
+    Transfer control of watchdog notifications to new PID.
+    """
+    daemon.notify("MAINPID={:d}".format(int(pid)))
+
+def _watchdog_task(shared_state, log):
     """
     Continuously ping the watchdog to tell him that we're still alive.
 
     This will keep running until the parent thread dies, or
     shared_state.system_ready gets set to False by someone.
     """
-    log.info("launching task")
     watchdog_timeout = \
             float(os.environ.get(
                 'WATCHDOG_USEC',
@@ -40,9 +45,12 @@ def watchdog_task(shared_state, log):
     daemon.notify("READY=1")
     log.info("READY=1, interval %f", watchdog_interval)
     while shared_state.system_ready.value:
+        # Sleep first, then ping, that avoids the case where transfer_control()
+        # is not yet complete before we call this for the first time, which
+        # would lead in error messages popping up in the systemd journal.
+        time.sleep(watchdog_interval)
         log.trace("Pinging watchdog....")
         daemon.notify("WATCHDOG=1")
-        time.sleep(watchdog_interval)
     log.error("Terminating watchdog thread!")
     return
 
@@ -53,7 +61,7 @@ def spawn_watchdog_task(shared_state, log):
     outlive the main thread.
     """
     task = threading.Thread(
-        target=watchdog_task,
+        target=_watchdog_task,
         args=[shared_state, log],
         name="MPMWatchdogTask",
         daemon=True,
-- 
cgit v1.2.3