aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mpm/python/usrp_mpm/sys_utils/filesystem_status.py67
-rw-r--r--mpm/tools/CMakeLists.txt1
-rwxr-xr-xmpm/tools/check-filesystem156
3 files changed, 224 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/sys_utils/filesystem_status.py b/mpm/python/usrp_mpm/sys_utils/filesystem_status.py
new file mode 100644
index 000000000..865741316
--- /dev/null
+++ b/mpm/python/usrp_mpm/sys_utils/filesystem_status.py
@@ -0,0 +1,67 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+Utilities for checking the filesystem status
+"""
+
+import hashlib
+import pathlib
+import subprocess
+import time
+
+def get_uhd_version(filesystem_root='/'):
+ def parse_uhd_version(versionstring):
+ assert(versionstring[0:4] == 'UHD ')
+ array = versionstring[4:].split('-')
+ assert(len(array) == 3)
+ return {
+ 'version': array[0],
+ 'appendix': array[1],
+ 'githash': array[2],
+ }
+ file = pathlib.Path(filesystem_root, 'usr/bin/uhd_config_info')
+ versionstring = subprocess.check_output([file, '--version']).decode('utf-8').splitlines()[0]
+ return parse_uhd_version(versionstring)
+
+def get_mender_artifact(filesystem_root='/', parse_manually=False):
+ def parse_artifact(output):
+ for line in output.splitlines():
+ if line.startswith('artifact_name='):
+ return line[14:]
+ return None
+ if filesystem_root != '/':
+ parse_manually = True
+ if parse_manually:
+ # parse mender artifact manually
+ file = pathlib.Path(filesystem_root, 'etc/mender/artifact_info')
+ if not file.exists():
+ return 'FILE NOT FOUND'
+ return parse_artifact(file.read_text())
+ else:
+ output = subprocess.check_output(['/usr/bin/mender', '-show-artifact']).decode('utf-8')
+ return output.splitlines()[0]
+
+def get_fs_version(filesystem_root='/'):
+ file = pathlib.Path(filesystem_root, 'etc/version')
+ if not file.exists():
+ return 'FILE NOT FOUND'
+ return file.read_text().splitlines()[0]
+
+def get_opkg_status_date(date_only=False, filesystem_root='/'):
+ if date_only:
+ tformat = "%Y-%m-%d"
+ else:
+ tformat = "%Y-%m-%d %H:%M:%S"
+ file = pathlib.Path(filesystem_root, 'var/lib/opkg/status')
+ if not file.exists():
+ return 'FILE NOT FOUND'
+ return time.strftime(tformat, time.gmtime(file.stat().st_mtime))
+
+def get_opkg_status_md5sum(filesystem_root='/'):
+ file = pathlib.Path(filesystem_root, 'var/lib/opkg/status')
+ if not file.exists():
+ return 'FILE NOT FOUND'
+ return hashlib.md5sum(file.read_text()).hexdigest()
diff --git a/mpm/tools/CMakeLists.txt b/mpm/tools/CMakeLists.txt
index 245720656..387cee46a 100644
--- a/mpm/tools/CMakeLists.txt
+++ b/mpm/tools/CMakeLists.txt
@@ -7,6 +7,7 @@
install(PROGRAMS
mpm_shell.py
mpm_debug.py
+ check-filesystem
DESTINATION ${RUNTIME_DIR}
)
diff --git a/mpm/tools/check-filesystem b/mpm/tools/check-filesystem
new file mode 100755
index 000000000..4129dddac
--- /dev/null
+++ b/mpm/tools/check-filesystem
@@ -0,0 +1,156 @@
+#!/usr/bin/env python3
+
+import argparse
+import os
+import pathlib
+import re
+import subprocess
+import sys
+import traceback
+import unittest
+
+from usrp_mpm.sys_utils.filesystem_status import *
+from usrp_mpm.sys_utils.dtoverlay import list_overlays
+from usrp_mpm.sys_utils.dtoverlay import list_available_overlays
+
+def parse_list(list_as_string):
+ list = list_as_string.split(',')
+ for i,s in enumerate(list):
+ list[i] = s.strip()
+ return list
+
+class SimpleResult(unittest.TextTestResult):
+ """A test result class with compacter results than the TextTestResult class
+ """
+
+ def getDescription(self, test):
+ return str(test).split(' ')[0]
+
+ def _exc_info_to_string(self, err, test):
+ """Converts a sys.exc_info()-style tuple of values into a string."""
+ exctype, value, tb = err
+ lines = str(value).splitlines()
+ if exctype == AssertionError:
+ lines = [lines[0]]
+ return '\n'.join(lines)
+
+ def printErrorList(self, flavour, errors):
+ for test, err in errors:
+ self.stream.writeln("%s: %s (%s)"
+ % (flavour, self.getDescription(test), err))
+
+class CheckFilesystem(unittest.TestCase):
+ def _assert_filesystem_root_is_not_set():
+ if self.args.filesystem_root:
+ raise ValueError("Changing the filesystem root is not possible for this test")
+
+ def __init__(self, methodName='runTest', args=None):
+ super(CheckFilesystem, self).__init__(methodName)
+ self.args=args
+
+ def test_uhd_version(self):
+ uhd_version = get_uhd_version(filesystem_root=self.args.filesystem_root)
+ self.assertEqual(uhd_version['version'], self.args.uhd_version)
+
+ def test_uhd_githash(self):
+ uhd_version = get_uhd_version(filesystem_root=self.args.filesystem_root)
+ self.assertEqual(uhd_version['githash'], self.args.uhd_githash)
+
+ def test_mender_artifact(self):
+ mender_artifact = get_mender_artifact(filesystem_root=self.args.filesystem_root)
+ self.assertEqual(mender_artifact, self.args.mender_artifact)
+
+ def test_fs_version(self):
+ fs_version = get_fs_version(filesystem_root=self.args.filesystem_root)
+ self.assertEqual(fs_version, self.args.fs_version)
+
+ def test_opkg_status_date(self):
+ if len(self.args.opkg_status_date) == 10:
+ date_only = True
+ elif len(self.args.opkg_status_date) == 19:
+ date_only = False
+ else:
+ raise ValueError("invalid date format")
+ opkg_status_date = get_opkg_status_date(date_only, filesystem_root=self.args.filesystem_root)
+ self.assertEqual(opkg_status_date, self.args.opkg_status_date)
+
+ def test_opkg_status_md5sum(self):
+ opkg_status_md5sum = get_opkg_status_md5sum(filesystem_root=self.args.filesystem_root)
+ self.assertEqual(opkg_status_md5sum, self.args.opkg_status_md5sum)
+
+ def test_dt_overlays_available(self):
+ _assert_filesystem_root_is_not_set()
+ dt_overlays_available = list_available_overlays()
+ dt_overlays_available.sort()
+ reference = parse_list(self.args.dt_overlays_available)
+ reference.sort()
+ self.assertEqual(dt_overlays_available, reference)
+
+ def test_dt_overlays_loaded(self):
+ _assert_filesystem_root_is_not_set()
+ dt_overlays_loaded = list(list_overlays(applied_only=True).keys())
+ dt_overlays_loaded.sort()
+ reference = parse_list(self.args.dt_overlays_loaded)
+ reference.sort()
+ self.assertEqual(dt_overlays_loaded, reference)
+
+ def test_systemd_init_successful(self):
+ _assert_filesystem_root_is_not_set()
+ output = subprocess.check_output(['systemctl', 'is-system-running']).decode('utf-8')
+ status = output.splitlines()[0]
+ self.assertEqual(status, 'running')
+
+ def test_mpm_init_successful(self):
+ _assert_filesystem_root_is_not_set()
+ p1 = subprocess.Popen(['echo', 'get_init_status'], stdout=subprocess.PIPE)
+ p2 = subprocess.Popen(['mpm_shell.py', '-c', 'localhost'], stdin=p1.stdout,
+ stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+ p1.stdout.close()
+ lines = p2.communicate()[0].decode('utf-8').splitlines()
+ r1=re.compile('^(\S*) \[C\]> < (.*)$')
+ result = None
+ for line in lines:
+ m1 = r1.match(line)
+ if m1:
+ result = m1[2]
+ self.assertEqual(result, "('true', 'No errors.')")
+
+def args_parser(argv):
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--uhd-version", help="Check UHD version")
+ parser.add_argument("--uhd-githash", help="Check UHD githash")
+ parser.add_argument("--mender-artifact", help="Check mender artifact")
+ parser.add_argument("--fs-version", help="Check filesystem version string")
+ parser.add_argument("--opkg-status-date", help="Check package management status file date")
+ parser.add_argument("--opkg-status-md5sum", help="Check package management status file md5sum")
+ parser.add_argument("--dt-overlays-available", help="Check which devicetree overlays are present")
+ parser.add_argument("--dt-overlays-loaded", help="Check which devicetree overlays are loaded")
+ parser.add_argument("--systemd-init-successful", help="Check if systemd init was successful", action="store_true")
+ parser.add_argument("--mpm-init-successful", help="Check if MPM init was successful", action="store_true")
+ parser.add_argument("--filesystem-root", help="Root of the filesystem to use", default="/")
+ args = parser.parse_args()
+ # if not args.filesystem_root:
+ # args.filesystem_root='/'
+ return args
+
+def get_tests(args):
+ tests = unittest.TestSuite()
+ for arg,value in vars(args).items():
+ # skip arguments which are not a test
+ if arg in ['filesystem_root']:
+ continue
+ if value:
+ # add e.g. "test_uhd_version"
+ tests.addTest(CheckFilesystem('test_' + arg, args))
+ return tests
+
+def main(argv):
+ if len(argv) == 1:
+ # display help if no arguments were provided
+ argv.append('-h')
+ args = args_parser(argv)
+ test_runner = unittest.TextTestRunner(verbosity=0, resultclass=SimpleResult)
+ test_result = test_runner.run(get_tests(args))
+
+if __name__ == '__main__':
+ main(sys.argv)