diff options
-rw-r--r-- | mpm/python/usrp_mpm/sys_utils/filesystem_status.py | 67 | ||||
-rw-r--r-- | mpm/tools/CMakeLists.txt | 1 | ||||
-rwxr-xr-x | mpm/tools/check-filesystem | 156 |
3 files changed, 224 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/sys_utils/filesystem_status.py b/mpm/python/usrp_mpm/sys_utils/filesystem_status.py new file mode 100644 index 000000000..865741316 --- /dev/null +++ b/mpm/python/usrp_mpm/sys_utils/filesystem_status.py @@ -0,0 +1,67 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Utilities for checking the filesystem status +""" + +import hashlib +import pathlib +import subprocess +import time + +def get_uhd_version(filesystem_root='/'): + def parse_uhd_version(versionstring): + assert(versionstring[0:4] == 'UHD ') + array = versionstring[4:].split('-') + assert(len(array) == 3) + return { + 'version': array[0], + 'appendix': array[1], + 'githash': array[2], + } + file = pathlib.Path(filesystem_root, 'usr/bin/uhd_config_info') + versionstring = subprocess.check_output([file, '--version']).decode('utf-8').splitlines()[0] + return parse_uhd_version(versionstring) + +def get_mender_artifact(filesystem_root='/', parse_manually=False): + def parse_artifact(output): + for line in output.splitlines(): + if line.startswith('artifact_name='): + return line[14:] + return None + if filesystem_root != '/': + parse_manually = True + if parse_manually: + # parse mender artifact manually + file = pathlib.Path(filesystem_root, 'etc/mender/artifact_info') + if not file.exists(): + return 'FILE NOT FOUND' + return parse_artifact(file.read_text()) + else: + output = subprocess.check_output(['/usr/bin/mender', '-show-artifact']).decode('utf-8') + return output.splitlines()[0] + +def get_fs_version(filesystem_root='/'): + file = pathlib.Path(filesystem_root, 'etc/version') + if not file.exists(): + return 'FILE NOT FOUND' + return file.read_text().splitlines()[0] + +def get_opkg_status_date(date_only=False, filesystem_root='/'): + if date_only: + tformat = "%Y-%m-%d" + else: + tformat = "%Y-%m-%d %H:%M:%S" + file = pathlib.Path(filesystem_root, 'var/lib/opkg/status') + if not file.exists(): + return 'FILE NOT FOUND' + return time.strftime(tformat, time.gmtime(file.stat().st_mtime)) + +def get_opkg_status_md5sum(filesystem_root='/'): + file = pathlib.Path(filesystem_root, 'var/lib/opkg/status') + if not file.exists(): + return 'FILE NOT FOUND' + return hashlib.md5sum(file.read_text()).hexdigest() diff --git a/mpm/tools/CMakeLists.txt b/mpm/tools/CMakeLists.txt index 245720656..387cee46a 100644 --- a/mpm/tools/CMakeLists.txt +++ b/mpm/tools/CMakeLists.txt @@ -7,6 +7,7 @@ install(PROGRAMS mpm_shell.py mpm_debug.py + check-filesystem DESTINATION ${RUNTIME_DIR} ) diff --git a/mpm/tools/check-filesystem b/mpm/tools/check-filesystem new file mode 100755 index 000000000..4129dddac --- /dev/null +++ b/mpm/tools/check-filesystem @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 + +import argparse +import os +import pathlib +import re +import subprocess +import sys +import traceback +import unittest + +from usrp_mpm.sys_utils.filesystem_status import * +from usrp_mpm.sys_utils.dtoverlay import list_overlays +from usrp_mpm.sys_utils.dtoverlay import list_available_overlays + +def parse_list(list_as_string): + list = list_as_string.split(',') + for i,s in enumerate(list): + list[i] = s.strip() + return list + +class SimpleResult(unittest.TextTestResult): + """A test result class with compacter results than the TextTestResult class + """ + + def getDescription(self, test): + return str(test).split(' ')[0] + + def _exc_info_to_string(self, err, test): + """Converts a sys.exc_info()-style tuple of values into a string.""" + exctype, value, tb = err + lines = str(value).splitlines() + if exctype == AssertionError: + lines = [lines[0]] + return '\n'.join(lines) + + def printErrorList(self, flavour, errors): + for test, err in errors: + self.stream.writeln("%s: %s (%s)" + % (flavour, self.getDescription(test), err)) + +class CheckFilesystem(unittest.TestCase): + def _assert_filesystem_root_is_not_set(): + if self.args.filesystem_root: + raise ValueError("Changing the filesystem root is not possible for this test") + + def __init__(self, methodName='runTest', args=None): + super(CheckFilesystem, self).__init__(methodName) + self.args=args + + def test_uhd_version(self): + uhd_version = get_uhd_version(filesystem_root=self.args.filesystem_root) + self.assertEqual(uhd_version['version'], self.args.uhd_version) + + def test_uhd_githash(self): + uhd_version = get_uhd_version(filesystem_root=self.args.filesystem_root) + self.assertEqual(uhd_version['githash'], self.args.uhd_githash) + + def test_mender_artifact(self): + mender_artifact = get_mender_artifact(filesystem_root=self.args.filesystem_root) + self.assertEqual(mender_artifact, self.args.mender_artifact) + + def test_fs_version(self): + fs_version = get_fs_version(filesystem_root=self.args.filesystem_root) + self.assertEqual(fs_version, self.args.fs_version) + + def test_opkg_status_date(self): + if len(self.args.opkg_status_date) == 10: + date_only = True + elif len(self.args.opkg_status_date) == 19: + date_only = False + else: + raise ValueError("invalid date format") + opkg_status_date = get_opkg_status_date(date_only, filesystem_root=self.args.filesystem_root) + self.assertEqual(opkg_status_date, self.args.opkg_status_date) + + def test_opkg_status_md5sum(self): + opkg_status_md5sum = get_opkg_status_md5sum(filesystem_root=self.args.filesystem_root) + self.assertEqual(opkg_status_md5sum, self.args.opkg_status_md5sum) + + def test_dt_overlays_available(self): + _assert_filesystem_root_is_not_set() + dt_overlays_available = list_available_overlays() + dt_overlays_available.sort() + reference = parse_list(self.args.dt_overlays_available) + reference.sort() + self.assertEqual(dt_overlays_available, reference) + + def test_dt_overlays_loaded(self): + _assert_filesystem_root_is_not_set() + dt_overlays_loaded = list(list_overlays(applied_only=True).keys()) + dt_overlays_loaded.sort() + reference = parse_list(self.args.dt_overlays_loaded) + reference.sort() + self.assertEqual(dt_overlays_loaded, reference) + + def test_systemd_init_successful(self): + _assert_filesystem_root_is_not_set() + output = subprocess.check_output(['systemctl', 'is-system-running']).decode('utf-8') + status = output.splitlines()[0] + self.assertEqual(status, 'running') + + def test_mpm_init_successful(self): + _assert_filesystem_root_is_not_set() + p1 = subprocess.Popen(['echo', 'get_init_status'], stdout=subprocess.PIPE) + p2 = subprocess.Popen(['mpm_shell.py', '-c', 'localhost'], stdin=p1.stdout, + stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + p1.stdout.close() + lines = p2.communicate()[0].decode('utf-8').splitlines() + r1=re.compile('^(\S*) \[C\]> < (.*)$') + result = None + for line in lines: + m1 = r1.match(line) + if m1: + result = m1[2] + self.assertEqual(result, "('true', 'No errors.')") + +def args_parser(argv): + parser = argparse.ArgumentParser() + parser.add_argument("--uhd-version", help="Check UHD version") + parser.add_argument("--uhd-githash", help="Check UHD githash") + parser.add_argument("--mender-artifact", help="Check mender artifact") + parser.add_argument("--fs-version", help="Check filesystem version string") + parser.add_argument("--opkg-status-date", help="Check package management status file date") + parser.add_argument("--opkg-status-md5sum", help="Check package management status file md5sum") + parser.add_argument("--dt-overlays-available", help="Check which devicetree overlays are present") + parser.add_argument("--dt-overlays-loaded", help="Check which devicetree overlays are loaded") + parser.add_argument("--systemd-init-successful", help="Check if systemd init was successful", action="store_true") + parser.add_argument("--mpm-init-successful", help="Check if MPM init was successful", action="store_true") + parser.add_argument("--filesystem-root", help="Root of the filesystem to use", default="/") + args = parser.parse_args() + # if not args.filesystem_root: + # args.filesystem_root='/' + return args + +def get_tests(args): + tests = unittest.TestSuite() + for arg,value in vars(args).items(): + # skip arguments which are not a test + if arg in ['filesystem_root']: + continue + if value: + # add e.g. "test_uhd_version" + tests.addTest(CheckFilesystem('test_' + arg, args)) + return tests + +def main(argv): + if len(argv) == 1: + # display help if no arguments were provided + argv.append('-h') + args = args_parser(argv) + test_runner = unittest.TextTestRunner(verbosity=0, resultclass=SimpleResult) + test_result = test_runner.run(get_tests(args)) + +if __name__ == '__main__': + main(sys.argv) |