diff options
Diffstat (limited to 'mpm/python')
| -rwxr-xr-x | mpm/python/e320_bist | 769 | 
1 files changed, 769 insertions, 0 deletions
| diff --git a/mpm/python/e320_bist b/mpm/python/e320_bist new file mode 100755 index 000000000..8ee5b9b91 --- /dev/null +++ b/mpm/python/e320_bist @@ -0,0 +1,769 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +E320 Built-In Self Test (BIST) + +""" + +from __future__ import print_function +import os +import sys +import subprocess +import re +import socket +import select +import time +import json +from datetime import datetime +import argparse +from six import iteritems + +# Timeout values are in seconds: +GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute" +GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test +                       # does not necessarily require GPS lock to pass, we +                       # reduce this value in order for the BIST to pass faster +                       # by default. + +############################################################################## +# Aurora/SFP BIST code +############################################################################## +def get_sfp_bist_defaults(): +    " Default dictionary for SFP/Aurora BIST dry-runs " +    return { +        'elapsed_time': 1.0, +        'max_roundtrip_latency': 0.8e-6, +        'throughput': 1000e6, +        'max_ber': 8.5e-11, +        'errors': 0, +        'bits': 12012486656, +    } + +def run_aurora_bist(master, slave=None): +    """ +    Spawn a BER test +    """ +    from usrp_mpm import aurora_control +    from usrp_mpm.sys_utils.uio import UIO +    try: +        master_au_uio = UIO(label=master, read_only=False) +        master_au_uio.open() +        master_au_ctrl = aurora_control.AuroraControl(master_au_uio) +        if slave is not None: +            slave_au_uio = UIO(label=slave, read_only=False) +            slave_au_uio.open() +        slave_au_ctrl = None if slave is None else aurora_control.AuroraControl( +            slave_au_uio +        ) +        return master_au_ctrl.run_ber_loopback_bist( +            duration=10, +            requested_rate=1300 * 8e6, +            slave=slave_au_ctrl, +        ) +    except Exception as ex: +        print("Unexpected exception: {}".format(str(ex))) +        exit(1) +    finally: +        master_au_uio.close() +        if slave is not None: +            slave_au_uio.close() + +def aurora_results_to_status(bist_results): +    """ +    Convert a dictionary coming from AuroraControl BIST to one that we can use +    for this BIST +    """ +    return bist_results['mst_errors'] == 0, { +        'elapsed_time': bist_results['time_elapsed'], +        'max_roundtrip_latency': bist_results['mst_latency_us'], +        'throughput': bist_results['approx_throughput'], +        'max_ber': bist_results['max_ber'], +        'errors': bist_results['mst_errors'], +        'bits': bist_results['mst_samps'], +    } + +############################################################################## +# Helpers +############################################################################## +def post_results(results): +    """ +    Given a dictionary, post the results. + +    This will print the results as JSON to stdout. +    """ +    print(json.dumps( +        results, +        sort_keys=True, +        indent=4, +        separators=(',', ': ') +    )) + +def filter_results_for_lv(results): +    """ +    The LabView JSON parser does not support a variety of things, such as +    nested dicts, and some downstream LV applications freak out if certain keys +    are not what they expect. +    This is a long hard-coded list of how results should look like for those +    cases. Note: This list needs manual supervision and attention for the case +    where either subsystems get renamed, or other architectural changes should +    occur. +    """ +    lv_compat_format = { +        'ddr3': { +            'throughput': -1, +        }, +        'gpsdo': { +            "class": "", +            "time": "", +            "ept": -1, +            "lat": -1, +            "lon": -1, +            "alt": -1, +            "epx": -1, +            "epy": -1, +            "epv": -1, +            "track": -1, +            "speed": -1, +            "climb": -1, +            "eps": -1, +            "mode": -1, +        }, +        'tpm': { +            'tpm0_caps': "", +        }, +        'sfp_loopback': { +            'elapsed_time': -1, +            'max_roundtrip_latency': -1, +            'throughput': -1, +            'max_ber': -1, +            'errors': -1, +            'bits': -1, +        }, +        'gpio': { +            'write_patterns': [], +            'read_patterns': [], +        }, +        'temp': { +            'fpga-thermal-zone': -1, +        }, +        'fan': { +            'cooling_device0': -1, +            'cooling_device1': -1, +        }, +    } +    # OK now go and brush up the results: +    def fixup_dict(result_dict, ref_dict): +        """ +        Touches up result_dict according to ref_dict by the following rules: +        - If a key is in result_dict that is not in ref_dict, delete that +        - If a key is in ref_dict that is not in result_dict, use the value +          from ref_dict +        """ +        ref_dict['error_msg'] = "" +        ref_dict['status'] = False +        result_dict = { +            k: v for k, v in iteritems(result_dict) +            if k in ref_dict or k in ('error_msg', 'status') +        } +        result_dict = { +            k: result_dict.get(k, ref_dict[k]) for k in ref_dict +        } +        return result_dict +    results = { +        testname: fixup_dict(testresults, lv_compat_format[testname]) \ +                    if testname in lv_compat_format else testresults +        for testname, testresults in iteritems(results) +    } +    return results + +def sock_read_line(my_sock, timeout=60, interval=0.1): +    """ +    Read from a socket until newline. If there was no newline until the timeout +    occurs, raise an error. Otherwise, return the line. +    """ +    line = b'' +    end_time = time.time() + timeout +    while time.time() < end_time: +        socket_ready = select.select([my_sock], [], [], 0)[0] +        if socket_ready: +            next_char = my_sock.recv(1) +            if next_char == b'\n': +                return line.decode('ascii') +            line += next_char +        else: +            time.sleep(interval) +    raise RuntimeError("sock_read_line() exceeded read timeout!") + +def poll_with_timeout(state_check, timeout_ms, interval_ms): +    """ +    Calls state_check() every interval_ms until it returns a positive value, or +    until a timeout is exceeded. + +    Returns True if state_check() returned True within the timeout. +    """ +    max_time = time.time() + (float(timeout_ms) / 1000) +    interval_s = float(interval_ms) / 1000 +    while time.time() < max_time: +        if state_check(): +            return True +        time.sleep(interval_s) +    return False + +def expand_options(option_list): +    """ +    Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar', +    'spam': 'eggs'}. +    """ +    return dict(x.split('=') for x in option_list) + +############################################################################## +# Bist class +############################################################################## +class E320BIST(object): +    """ +    BIST Tool for the USRP E320 +    """ +    # This defines special tests that are really collections of other tests. +    collections = { +        'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro", "ref_clock_int"], +        'extended': "*", +    } + +    @staticmethod +    def make_arg_parser(): +        """ +        Return arg parser +        """ +        parser = argparse.ArgumentParser( +            description="E320 BIST Tool", +        ) +        parser.add_argument( +            '-n', '--dry-run', action='store_true', +            help="Fake out the tests. All tests will return a valid" \ +                 " response, but will not actually interact with hardware.", +        ) +        parser.add_argument( +            '-v', '--verbose', action='store_true', +            help="Crank up verbosity level", +        ) +        parser.add_argument( +            '--debug', action='store_true', +            help="For debugging this tool.", +        ) +        parser.add_argument( +            '--option', '-o', action='append', default=[], +            help="Option for individual test.", +        ) +        parser.add_argument( +            '--lv-compat', action='store_true', +            help="Provides compatibility with the LV JSON parser. Don't run " +                 "this mode unless you know what you're doing. The JSON " +                 "output does not necessarily reflect the actual system " +                 "status when using this mode.", +        ) +        parser.add_argument( +            'tests', +            help="List the tests that should be run", +            nargs='+', # There has to be at least one +        ) +        return parser + +    def __init__(self): +        self.args = E320BIST.make_arg_parser().parse_args() +        self.args.option = expand_options(self.args.option) +        try: +            from usrp_mpm.periph_manager.e320 import e320 +            default_rev = e320.mboard_max_rev +        except ImportError: +            # This means we're in dry run mode or something like that, so just +            # pick something +            default_rev = 3 +        self.mb_rev = int(self.args.option.get('mb_rev', default_rev)) +        self.tests_to_run = set() +        for test in self.args.tests: +            if test in self.collections: +                for test in self.expand_collection(test): +                    self.tests_to_run.add(test) +            else: +                self.tests_to_run.add(test) +        try: +            # Keep this import here so we can do dry-runs without any MPM code +            from usrp_mpm import get_main_logger +            if not self.args.verbose: +                from usrp_mpm.mpmlog import WARNING +                get_main_logger().setLevel(WARNING) +            self.log = get_main_logger().getChild('main') +        except ImportError: +            print("No logging capability available.") + +    def expand_collection(self, coll): +        """ +        Return names of tests in a collection +        """ +        tests = self.collections[coll] +        if tests == "*": +            tests = {x.replace('bist_', '') +                     for x in dir(self) +                     if x.find('bist_') == 0 +                    } +        else: +            tests = set(tests) +        return tests + +    def run(self): +        """ +        Execute tests. + +        Returns True on Success. +        """ +        def execute_test(testname): +            """ +            Actually run a test. +            """ +            testmethod_name = "bist_{0}".format(testname) +            sys.stderr.write( +                "Executing test method: {0}\n\n".format(testmethod_name) +            ) +            try: +                status, data = getattr(self, testmethod_name)() +                data['status'] = status +                data['error_msg'] = data.get('error_msg', '') +                return status, data +            except AttributeError: +                sys.stderr.write("Test not defined: {}\n".format(testname)) +                return False, {} +            except Exception as ex: +                sys.stderr.write( +                    "Test {} failed to execute: {}\n".format(testname, str(ex)) +                ) +                if self.args.debug: +                    raise +                return False, {'error_msg': str(ex)} +        tests_successful = True +        result = {} +        for test in self.tests_to_run: +            status, result_data = execute_test(test) +            tests_successful = tests_successful and status +            result[test] = result_data +        if self.args.lv_compat: +            result = filter_results_for_lv(result) +        post_results(result) +        return tests_successful + +############################################################################# +# BISTS +# All bist_* methods must return True/False success values! +############################################################################# +    def bist_rtc(self): +        """ +        BIST for RTC (real time clock) + +        Return dictionary: +        - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601 +                format, as a string. As if running 'date -Iseconds -u'. +        - time: Same time, but in seconds since epoch. + +        Return status: +        Unless datetime throws an exception, returns True. +        """ +        assert 'rtc' in self.tests_to_run +        utc_now = datetime.utcnow() +        return True, { +            'time': time.mktime(utc_now.timetuple()), +            'date': utc_now.replace(microsecond=0).isoformat() + "+00:00", +        } + +    def bist_gyro(self): +        """ +        BIST for GYRO (MPU9250) + +        Return dictionary: + +        Return status: True if MPU9250 is detected. +        """ +        assert 'gyro' in self.tests_to_run +        if self.args.dry_run: +            return True, {'device_name': 'dry_run mpu9250'} +        import pyudev +        context = pyudev.Context() +        result = { +            'device_name': device.get('OF_NAME') +            for device in context.list_devices(subsystem='iio', DEVTYPE='iio_device') +            if 'mpu9250' in device.get('OF_NAME') is not None +        } +        if len(result) < 1: +            result['error_msg'] = "No GYRO detected!" +        return 'error_msg' not in result, result + + +    def bist_ddr3(self): +        """ +        BIST for PL DDR3 DRAM +        Description: Calls a test to examine the speed of the DDR3. To be +        precise, it fires up a UHD session, which runs a DDR3 BiST internally. +        If that works, it'll return estimated throughput that was gathered +        during the DDR3 BiST. + +        External Equipment: None + +        Return dictionary: +        - throughput: The estimated throughput in bytes/s + +        Return status: +        True if the DDR3 bist passed +        """ +        assert 'ddr3' in self.tests_to_run +        if self.args.dry_run: +            return True, {'throughput': 1250e6} +        result = {} +        ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1' +        try: +            output = subprocess.check_output( +                ddr3_bist_executor, +                stderr=subprocess.STDOUT, +                shell=True, +            ) +        except subprocess.CalledProcessError as ex: +            # Don't throw errors from uhd_usrp_probe +            output = ex.output +        output = output.decode("utf-8") +        mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output) +        if mobj is not None: +            result['throughput'] = float(mobj.group('thrup')) * 1000 +        else: +            result['throughput'] = 0 +            result['error_msg'] = result.get('error_msg', '') + \ +                                        "\n\nFailed match throughput regex!" +        return result.get('throughput', 0) > 1000e3, result + +    def bist_gpsdo(self): +        """ +        BIST for GPSDO +        Description: Returns GPS information +        External Equipment: None; Recommend attaching an antenna or providing +                           fake GPS information + +        Return dictionary: A TPV dictionary as returned by gpsd. +        See also: http://www.catb.org/gpsd/gpsd_json.html + +        Check for mode 2 or 3 to see if it's locked. +        """ +        assert 'gpsdo' in self.tests_to_run +        if self.args.dry_run: +            return True, { +                "class": "TPV", +                "time": "2014-30T11:48:20.10Z", +                "ept": 0.005, +                "lat": 30.407899, +                "lon": -97.726634, +                "alt": 1327.689, +                "epx": 15.319, +                "epy": 17.054, +                "epv": 124.484, +                "track": 10.3797, +                "speed": 0.091, +                "climb": -0.085, +                "eps": 34.11, +                "mode": 3 +            } +        from usrp_mpm.periph_manager import e320 +        # Turn on GPS, give some time to acclimatize +        print("Entered gpsdo test") +        mb_regs = e320.MboardRegsControl(e320.e320.mboard_regs_label, self.log) +        mb_regs.enable_gps(True) +        time.sleep(5) +        gps_warmup_timeout = float( +            self.args.option.get('gps_warmup_timeout', GPS_WARMUP_TIMEOUT)) +        gps_lockok_timeout = float( +            self.args.option.get('gps_lockok_timeout', GPS_LOCKOK_TIMEOUT)) +        # Wait for WARMUP to go low +        sys.stderr.write( +            "Waiting for WARMUP to go low for up to {} seconds...\n".format( +                gps_warmup_timeout)) +        if not poll_with_timeout( +                lambda: not bool((mb_regs.get_gps_status() >> 4) & 0x1), +                gps_warmup_timeout*1000, 1000 +            ): +            raise RuntimeError( +                "GPS-WARMUP did not go low within {} seconds!".format( +                    gps_warmup_timeout)) +        sys.stderr.write("Chip is warmed up.\n") +        # Wait for LOCKOK. Data sheet says wait up to 15 minutes for GPS lock. +        sys.stderr.write( +            "Waiting for LOCKOK to go high for up to {} seconds...\n".format( +                gps_lockok_timeout)) +        if not poll_with_timeout( +                mb_regs.get_gps_locked_val, +                gps_lockok_timeout*1000, +                1000 +            ): +            sys.stderr.write("No GPS-LOCKOK!\n") +        gps_status = mb_regs.get_gps_status() +        sys.stderr.write("GPS-SURVEY status: {}\n".format( +            (gps_status >> 3) & 0x1 +        )) +        sys.stderr.write("GPS-PHASELOCK status: {}\n".format( +            (gps_status >> 2) & 0x1 +        )) +        sys.stderr.write("GPS-ALARM status: {}\n".format( +            (gps_status >> 1) & 0x1 +        )) +        # Now read back response from chip +        my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +        my_sock.connect(('localhost', 2947)) +        sys.stderr.write("Connected to GPSDO socket.\n") +        query_cmd = b'?WATCH={"enable":true,"json":true}' +        my_sock.sendall(query_cmd) +        sys.stderr.write("Sent query: {}\n".format(query_cmd)) +        sock_read_line(my_sock, timeout=10) +        sys.stderr.write("Received initial newline.\n") +        result = {} +        while result.get('class', None) != 'TPV': +            json_result = sock_read_line(my_sock, timeout=60) +            sys.stderr.write( +                "Received JSON response: {}\n\n".format(json_result) +            ) +            result = json.loads(json_result) +        my_sock.sendall(b'?WATCH={"enable":false}') +        my_sock.close() +        # If we reach this line, we have a valid result and the chip responded. +        # However, it doesn't necessarily mean we had a GPS lock. +        return True, result + +    def bist_tpm(self): +        """ +        BIST for TPM (Trusted Platform Module) + +        This reads the caps value for all detected TPM devices. + +        Return dictionary: +        - tpm<N>_caps: TPM manufacturer and version info. Is a multi-line +                       string. + +        Return status: True if exactly one TPM device is detected. +        """ +        assert 'tpm' in self.tests_to_run +        if self.args.dry_run: +            return True, { +                'tpm0_caps': "Fake caps value\n\nVersion 0.0.0", +            } +        result = {} +        props_to_read = ('caps',) +        base_path = '/sys/class/tpm' +        for tpm_device in os.listdir(base_path): +            if tpm_device.startswith('tpm'): +                for key in props_to_read: +                    result['{}_{}'.format(tpm_device, key)] = open( +                        os.path.join(base_path, tpm_device, key), 'r' +                    ).read().strip() +        return len(result) == 1, result + +    def bist_ref_clock_int(self): +        """ +        BIST for clock lock from internal (20MHz). +        Description: Checks to see if we can lock to an internal +        clock source. + +        External Equipment: None +        Return dictionary: +        - <sensor-name>: +          - locked: Boolean lock status + +        There can be multiple ref lock sensors; for a pass condition they all +        need to be asserted. +        """ +        assert 'ref_clock_int' in self.tests_to_run +        if self.args.dry_run: +            return True, {'ref_locked': True} +        # RevB only: Read register from FPGA for lock status. +        result = {} +        from usrp_mpm.periph_manager import e320, e320_periphs +        mb_regs = e320_periphs.MboardRegsControl(e320.e320.mboard_regs_label, self.log) +        mb_regs.set_clock_source('internal', 20e6) +        time.sleep(5) +        if mb_regs.get_refclk_lock(): +            result = {'ref_locked': mb_regs.get_refclk_lock()} +        if len(result) < 1: +            result['error_msg'] = "Reference Clock not locked" +        return 'error_msg' not in result, result + +    def bist_ref_clock_ext(self): +        """ +        BIST for clock lock from external (10MHz) source. +        Description: Checks to see if we can lock to an external +        clock source. + +        External Equipment: External 10 MHz reference clock needed (from Octoclock) +        Return dictionary: +        - <sensor-name>: +          - locked: Boolean lock status + +        There can be multiple ref lock sensors; for a pass condition they all +        need to be asserted. +        """ +        assert 'ref_clock_ext' in self.tests_to_run +        if self.args.dry_run: +            return True, {'ref_locked': True} +        # RevB only: Read register from FPGA for lock status. +        result = {} +        from usrp_mpm.periph_manager import e320, e320_periphs +        mb_regs = e320_periphs.MboardRegsControl(e320.e320.mboard_regs_label, self.log) +        mb_regs.set_clock_source('external', 10e6) +        time.sleep(5) +        if mb_regs.get_refclk_lock(): +            result = {'ref_locked': mb_regs.get_refclk_lock()} +        if len(result) < 1: +            result['error_msg'] = "Reference Clock not locked" +        return 'error_msg' not in result, result + +    def bist_sfp_loopback(self): +        """ +        BIST for SFP+ ports: +        Description: Uses one SFP+ port in loopback mode. + +        External Equipment: Loopback module in SFP required +        required. + +        Return dictionary: +        - elapsed_time: Float value, test time in seconds +        - max_roundtrip_latency: Float value, max roundtrip latency in seconds +        - throughput: Approximate data throughput in bytes/s +        - max_ber: Estimated maximum BER, float value. +        - errors: Number of errors +        - bits: Number of bits that were transferred +        """ +        if self.args.dry_run: +            return True, get_sfp_bist_defaults() +        sfp_bist_results = run_aurora_bist(master='misc-auro-regs') +        return aurora_results_to_status(sfp_bist_results) + +    def bist_gpio(self): +        """ +        BIST for GPIO +        Description: Writes and reads the values to the GPIO + +        Needed Equipment: External loopback as follows +            GPIO +            0<->4 +            1<->5 +            2<->6 +            3<->7 + +        Return dictionary: +        - write_patterns: A list of patterns that were written +        - read_patterns: A list of patterns that were read back +        """ +        assert 'gpio' in self.tests_to_run +        GPIO_WIDTH = 8 +        patterns = range(16) +        if self.args.dry_run: +            return True, { +                'write_patterns': list(patterns), +                'read_patterns': list(patterns), +            } +        from usrp_mpm.periph_manager import e320, e320_periphs +        mb_regs = e320_periphs.MboardRegsControl(e320.e320.mboard_regs_label, self.log) +        mb_regs.enable_fp_gpio(True) +        mb_regs.set_fp_gpio_voltage(2.5) +        mb_regs.set_fp_gpio_master(0xFF) +        # Allow some time for the front-panel GPIOs to become usable +        time.sleep(1) +        ddr1 = 0xf0 +        ddr2 = 0x0f +        def _run_gpio(ddr, patterns): +            " Run a GPIO test for a given set of patterns " +            gpio_ctrl = e320_periphs.FrontpanelGPIO(ddr) +            for pattern in patterns: +                gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr) +                time.sleep(0.1) +                gpio_rb = gpio_ctrl.get_all() +                if pattern != gpio_rb: +                    return False, {'write_patterns': [pattern], +                                   'read_patterns': [gpio_rb]} +            return True, {'write_patterns': list(patterns), +                          'read_patterns': list(patterns)} +        status, data = _run_gpio(ddr1, patterns) +        if not status: +            return status, data +        status, data = _run_gpio(ddr2, patterns) +        return status, data + +    def bist_temp(self): +        """ +        BIST for temperature sensors +        Description: Reads the temperature sensors on the motherboards and +        returns their values in mC + +        Return dictionary: +        - <thermal-zone-name>: temp in mC +        """ +        assert 'temp' in self.tests_to_run +        if self.args.dry_run: +            return True, {'fpga-thermal-zone': 30000} +        import pyudev +        context = pyudev.Context() +        result = { +            device.attributes.get('type').decode('ascii'): \ +                    int(device.attributes.get('temp').decode('ascii')) +            for device in context.list_devices(subsystem='thermal') +            if 'temp' in device.attributes.available_attributes \ +                    and device.attributes.get('temp') is not None +        } +        if len(result) < 1: +            result['error_msg'] = "No temperature sensors found!" +        return 'error_msg' not in result, result + +    def bist_fan(self): +        """ +        BIST for temperature sensors +        Description: Reads the RPM values of the fans on the motherboard + +        Return dictionary: +        - <fan-name>: Fan speed in RPM + +        External Equipment: None +        """ +        assert 'fan' in self.tests_to_run +        if self.args.dry_run: +            return True, {'cooling_device0': 10000, 'cooling_device1': 10000} +        import pyudev +        context = pyudev.Context() +        result = { +            device.sys_name: int(device.attributes.get('cur_state')) +            for device in context.list_devices(subsystem='thermal') +            if 'cur_state' in device.attributes.available_attributes \ +                    and device.attributes.get('cur_state') is not None +        } +        return len(result) == 2, result + + +def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask): +    """Helper function for set gpio. +    What this function do is take decimal value and convert to a binary string +    then try to set those individual bits to the gpio_bank. +    Arguments: +        gpio_bank  -- gpio bank type. +        value -- value to set onto gpio bank. +        gpio_size -- size of the gpio bank +        ddr_mask  -- data direction register bit mask. 0 is input; 1 is output. +    """ +    ddr_size = bin(ddr_mask).count("1") +    value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):] +    ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):] +    for i in range(gpio_size): +        if ddr_bitstring[gpio_size - 1 - i] == "1": +            gpio_bank.set(i, value_bitstring[i % ddr_size]) + +############################################################################## +# main +############################################################################## +def main(): +    " Go, go, go! " +    return E320BIST().run() + +if __name__ == '__main__': +    exit(not main()) | 
