diff options
| author | Martin Braun <martin.braun@ettus.com> | 2018-10-12 17:51:54 -0700 | 
|---|---|---|
| committer | Brent Stapleton <bstapleton@g.hmc.edu> | 2018-10-24 18:53:16 -0700 | 
| commit | bf353515b6f26fa280f6a8d3fed477cc97e4a7ec (patch) | |
| tree | 837f915c790bcd3463444cfe336d3654e59f6b53 /mpm | |
| parent | 12dd0939fcc877e4f30c045ead6b2d8fc2f63772 (diff) | |
| download | uhd-bf353515b6f26fa280f6a8d3fed477cc97e4a7ec.tar.gz uhd-bf353515b6f26fa280f6a8d3fed477cc97e4a7ec.tar.bz2 uhd-bf353515b6f26fa280f6a8d3fed477cc97e4a7ec.zip | |
mpm: e320: n3xx: Factor BIST code to common module
Diffstat (limited to 'mpm')
| -rwxr-xr-x | mpm/python/e320_bist | 561 | ||||
| -rwxr-xr-x | mpm/python/n3xx_bist | 697 | ||||
| -rw-r--r-- | mpm/python/usrp_mpm/bist.py | 598 | 
3 files changed, 786 insertions, 1070 deletions
| diff --git a/mpm/python/e320_bist b/mpm/python/e320_bist index ef7c85980..ad5b2163a 100755 --- a/mpm/python/e320_bist +++ b/mpm/python/e320_bist @@ -6,21 +6,12 @@  #  """  E320 Built-In Self Test (BIST) -  """  from __future__ import print_function -import os  import sys -import subprocess -import re -import socket -import select  import time -import json -from datetime import datetime -import argparse -from six import iteritems +from usrp_mpm import bist  # Timeout values are in seconds:  GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute" @@ -29,113 +20,30 @@ GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test                         # reduce this value in order for the BIST to pass faster                         # by default.  # Temperature Sensor Mapping based on location -temp_sensor_map = { +TEMP_SENSOR_MAP = {      "thermal_zone0" : "internal",      "thermal_zone1" : "rf_channelA",      "thermal_zone2" : "fpga",      "thermal_zone3" : "rf_channelB",      "thermal_zone4" : "main_power"  } -############################################################################## -# Aurora/SFP BIST code -############################################################################## -def get_sfp_bist_defaults(): -    " Default dictionary for SFP/Aurora BIST dry-runs " -    return { -        'elapsed_time': 1.0, -        'max_roundtrip_latency': 0.8e-6, -        'throughput': 1000e6, -        'max_ber': 8.5e-11, -        'errors': 0, -        'bits': 12012486656, -    } - -def assert_aurora_image(master, slave): -    """ -    Make sure we have an FPGA image with which we can run the requested tests. - -    Will load an AA image if not, which always satisfies all conditions for -    running Aurora tests. -    """ -    from usrp_mpm.sys_utils import uio -    if not uio.find_uio_device(master)[0] or \ -            (slave is not None and not uio.find_uio_device(slave)[0]): -        load_fpga_image('AA') - -def run_aurora_bist(master, slave=None): -    """ -    Spawn a BER test -    """ -    from usrp_mpm import aurora_control -    from usrp_mpm.sys_utils.uio import open_uio - -    class DummyContext(object): -        """Dummy class for context managers""" -        def __enter__(self): -            return - -        def __exit__(self, exc_type, exc_value, traceback): -            return exc_type is None - -    # Go, go, go! -    try: -        assert_aurora_image(master, slave) -        with open_uio(label=master, read_only=False) as master_au_uio: -            master_au_ctrl = aurora_control.AuroraControl(master_au_uio) -            with open_uio(label=slave, read_only=False)\ -                    if slave is not None else DummyContext() as slave_au_uio: -                slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\ -                    if slave is not None else None -                return master_au_ctrl.run_ber_loopback_bist( -                    duration=10, -                    requested_rate=1300 * 8e6, -                    slave=slave_au_ctrl, -                ) -    except Exception as ex: -        print("Unexpected exception: {}".format(str(ex))) -        exit(1) - - -def aurora_results_to_status(bist_results): -    """ -    Convert a dictionary coming from AuroraControl BIST to one that we can use -    for this BIST -    """ -    return bist_results['mst_errors'] == 0, { -        'elapsed_time': bist_results['time_elapsed'], -        'max_roundtrip_latency': bist_results['mst_latency_us'], -        'throughput': bist_results['approx_throughput'], -        'max_ber': bist_results['max_ber'], -        'errors': bist_results['mst_errors'], -        'bits': bist_results['mst_samps'], -    }  ############################################################################## -# Helpers +# Bist class  ############################################################################## -def post_results(results): -    """ -    Given a dictionary, post the results. - -    This will print the results as JSON to stdout. -    """ -    print(json.dumps( -        results, -        sort_keys=True, -        indent=4, -        separators=(',', ': ') -    )) - -def filter_results_for_lv(results): +class E320BIST(bist.UsrpBIST):      """ -    The LabView JSON parser does not support a variety of things, such as -    nested dicts, and some downstream LV applications freak out if certain keys -    are not what they expect. -    This is a long hard-coded list of how results should look like for those -    cases. Note: This list needs manual supervision and attention for the case -    where either subsystems get renamed, or other architectural changes should -    occur. +    BIST Tool for the USRP E320      """ +    usrp_type = "E320" +    # This defines special tests that are really collections of other tests. +    collections = { +        'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro", +                     "ref_clock_int"], +        'extended': "*", +    } +    # Default FPGA image type +    DEFAULT_FPGA_TYPE = '1G'      lv_compat_format = {          'ddr3': {              'throughput': -1, @@ -172,250 +80,35 @@ def filter_results_for_lv(results):              'read_patterns': [],          },          'temp': { -            temp_sensor_map['thermal_zone0']: -1, -            temp_sensor_map['thermal_zone1']: -1, -            temp_sensor_map['thermal_zone2']: -1, -            temp_sensor_map['thermal_zone3']: -1, -            temp_sensor_map['thermal_zone4']: -1, +            TEMP_SENSOR_MAP['thermal_zone0']: -1, +            TEMP_SENSOR_MAP['thermal_zone1']: -1, +            TEMP_SENSOR_MAP['thermal_zone2']: -1, +            TEMP_SENSOR_MAP['thermal_zone3']: -1, +            TEMP_SENSOR_MAP['thermal_zone4']: -1,          },          'fan': {              'cooling_device0': -1,          },      } -    # OK now go and brush up the results: -    def fixup_dict(result_dict, ref_dict): -        """ -        Touches up result_dict according to ref_dict by the following rules: -        - If a key is in result_dict that is not in ref_dict, delete that -        - If a key is in ref_dict that is not in result_dict, use the value -          from ref_dict -        """ -        ref_dict['error_msg'] = "" -        ref_dict['status'] = False -        result_dict = { -            k: v for k, v in iteritems(result_dict) -            if k in ref_dict or k in ('error_msg', 'status') -        } -        result_dict = { -            k: result_dict.get(k, ref_dict[k]) for k in ref_dict -        } -        return result_dict -    results = { -        testname: fixup_dict(testresults, lv_compat_format[testname]) \ -                    if testname in lv_compat_format else testresults -        for testname, testresults in iteritems(results) -    } -    return results - -def sock_read_line(my_sock, timeout=60, interval=0.1): -    """ -    Read from a socket until newline. If there was no newline until the timeout -    occurs, raise an error. Otherwise, return the line. -    """ -    line = b'' -    end_time = time.time() + timeout -    while time.time() < end_time: -        socket_ready = select.select([my_sock], [], [], 0)[0] -        if socket_ready: -            next_char = my_sock.recv(1) -            if next_char == b'\n': -                return line.decode('ascii') -            line += next_char -        else: -            time.sleep(interval) -    raise RuntimeError("sock_read_line() exceeded read timeout!") - -def poll_with_timeout(state_check, timeout_ms, interval_ms): -    """ -    Calls state_check() every interval_ms until it returns a positive value, or -    until a timeout is exceeded. - -    Returns True if state_check() returned True within the timeout. -    """ -    max_time = time.time() + (float(timeout_ms) / 1000) -    interval_s = float(interval_ms) / 1000 -    while time.time() < max_time: -        if state_check(): -            return True -        time.sleep(interval_s) -    return False - -def expand_options(option_list): -    """ -    Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar', -    'spam': 'eggs'}. -    """ -    return dict(x.split('=') for x in option_list) +    device_args = "type=e3xx,addr=127.0.0.1" -############################################################################## -# Bist class -############################################################################## -class E320BIST(object): -    """ -    BIST Tool for the USRP E320 -    """ -    # This defines special tests that are really collections of other tests. -    collections = { -        'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro", "ref_clock_int"], -        'extended': "*", -    } -    # Default FPGA image type -    DEFAULT_FPGA_TYPE = '1G' +    def __init__(self): +        super(E320BIST, self).__init__() -    @staticmethod -    def make_arg_parser(): -        """ -        Return arg parser -        """ -        parser = argparse.ArgumentParser( -            description="E320 BIST Tool", -        ) -        parser.add_argument( -            '-n', '--dry-run', action='store_true', -            help="Fake out the tests. All tests will return a valid" \ -                 " response, but will not actually interact with hardware.", -        ) -        parser.add_argument( -            '-v', '--verbose', action='store_true', -            help="Crank up verbosity level", -        ) -        parser.add_argument( -            '--debug', action='store_true', -            help="For debugging this tool.", -        ) -        parser.add_argument( -            '--option', '-o', action='append', default=[], -            help="Option for individual test.", -        ) -        parser.add_argument( -            '--lv-compat', action='store_true', -            help="Provides compatibility with the LV JSON parser. Don't run " -                 "this mode unless you know what you're doing. The JSON " -                 "output does not necessarily reflect the actual system " -                 "status when using this mode.", -        ) -        parser.add_argument( -            '--skip-fpga-reload', action='store_true', -            help="Skip reloading the default FPGA image post-test. Note: by" -                 "specifying this argument, the FPGA image loaded could be " -                 "anything post-test.", -        ) -        parser.add_argument( -            'tests', -            help="List the tests that should be run", -            nargs='+', # There has to be at least one -        ) -        return parser +    def get_mb_periph_mgr(self): +        """Return reference to an e320 periph manager""" +        from usrp_mpm.periph_manager.e320 import e320 +        return e320 -    def __init__(self): -        self.args = E320BIST.make_arg_parser().parse_args() -        self.args.option = expand_options(self.args.option) -        # If this is true, trigger a reload of the default FPGA image -        self.reload_fpga_image = False -        try: -            from usrp_mpm.periph_manager.e320 import e320 -            default_rev = e320.mboard_max_rev -        except ImportError: -            # This means we're in dry run mode or something like that, so just -            # pick something -            default_rev = 3 -        self.mb_rev = int(self.args.option.get('mb_rev', default_rev)) -        self.tests_to_run = set() -        for test in self.args.tests: -            if test in self.collections: -                for test in self.expand_collection(test): -                    self.tests_to_run.add(test) -            else: -                self.tests_to_run.add(test) -        try: -            # Keep this import here so we can do dry-runs without any MPM code -            from usrp_mpm import get_main_logger -            if not self.args.verbose: -                from usrp_mpm.mpmlog import WARNING -                get_main_logger().setLevel(WARNING) -            self.log = get_main_logger().getChild('main') -        except ImportError: -            print("No logging capability available.") - -    def expand_collection(self, coll): -        """ -        Return names of tests in a collection -        """ -        tests = self.collections[coll] -        if tests == "*": -            tests = {x.replace('bist_', '') -                     for x in dir(self) -                     if x.find('bist_') == 0 -                    } -        else: -            tests = set(tests) -        return tests - -    def run(self): -        """ -        Execute tests. +    def get_product_id(self): +        """Return the mboard product ID (e320):""" +        return bist.get_product_id_from_eeprom(valid_ids=['e320']) -        Returns True on Success. -        """ -        def execute_test(testname): -            """ -            Actually run a test. -            """ -            testmethod_name = "bist_{0}".format(testname) -            sys.stderr.write( -                "Executing test method: {0}\n\n".format(testmethod_name) -            ) -            try: -                status, data = getattr(self, testmethod_name)() -                data['status'] = status -                data['error_msg'] = data.get('error_msg', '') -                return status, data -            except AttributeError: -                sys.stderr.write("Test not defined: {}\n".format(testname)) -                return False, {} -            except Exception as ex: -                sys.stderr.write( -                    "Test {} failed to execute: {}\n".format(testname, str(ex)) -                ) -                if self.args.debug: -                    raise -                return False, {'error_msg': str(ex)} -        tests_successful = True -        result = {} -        for test in self.tests_to_run: -            status, result_data = execute_test(test) -            tests_successful = tests_successful and status -            result[test] = result_data -        if self.args.lv_compat: -            result = filter_results_for_lv(result) -        post_results(result) -        if self.reload_fpga_image and not self.args.skip_fpga_reload: -            load_fpga_image(self.DEFAULT_FPGA_TYPE) -        return tests_successful  #############################################################################  # BISTS  # All bist_* methods must return True/False success values!  ############################################################################# -    def bist_rtc(self): -        """ -        BIST for RTC (real time clock) - -        Return dictionary: -        - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601 -                format, as a string. As if running 'date -Iseconds -u'. -        - time: Same time, but in seconds since epoch. - -        Return status: -        Unless datetime throws an exception, returns True. -        """ -        assert 'rtc' in self.tests_to_run -        utc_now = datetime.utcnow() -        return True, { -            'time': time.mktime(utc_now.timetuple()), -            'date': utc_now.replace(microsecond=0).isoformat() + "+00:00", -        } -      def bist_gyro(self):          """          BIST for GYRO (MPU9250) @@ -458,25 +151,7 @@ class E320BIST(object):          assert 'ddr3' in self.tests_to_run          if self.args.dry_run:              return True, {'throughput': 1250e6} -        result = {} -        ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1' -        try: -            output = subprocess.check_output( -                ddr3_bist_executor, -                stderr=subprocess.STDOUT, -                shell=True, -            ) -        except subprocess.CalledProcessError as ex: -            # Don't throw errors from uhd_usrp_probe -            output = ex.output -        output = output.decode("utf-8") -        mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output) -        if mobj is not None: -            result['throughput'] = float(mobj.group('thrup')) * 1000 -        else: -            result['throughput'] = 0 -            result['error_msg'] = result.get('error_msg', '') + \ -                                        "\n\nFailed match throughput regex!" +        result = bist.test_ddr3_with_usrp_probe()          return result.get('throughput', 0) > 1000e3, result      def bist_gpsdo(self): @@ -522,7 +197,7 @@ class E320BIST(object):          sys.stderr.write(              "Waiting for WARMUP to go low for up to {} seconds...\n".format(                  gps_warmup_timeout)) -        if not poll_with_timeout( +        if not bist.poll_with_timeout(                  lambda: not bool((mb_regs.get_gps_status() >> 4) & 0x1),                  gps_warmup_timeout*1000, 1000              ): @@ -534,7 +209,7 @@ class E320BIST(object):          sys.stderr.write(              "Waiting for LOCKOK to go high for up to {} seconds...\n".format(                  gps_lockok_timeout)) -        if not poll_with_timeout( +        if not bist.poll_with_timeout(                  mb_regs.get_gps_locked_val,                  gps_lockok_timeout*1000,                  1000 @@ -550,24 +225,8 @@ class E320BIST(object):          sys.stderr.write("GPS-ALARM status: {}\n".format(              (gps_status >> 1) & 0x1          )) -        # Now read back response from chip -        my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -        my_sock.connect(('localhost', 2947)) -        sys.stderr.write("Connected to GPSDO socket.\n") -        query_cmd = b'?WATCH={"enable":true,"json":true}' -        my_sock.sendall(query_cmd) -        sys.stderr.write("Sent query: {}\n".format(query_cmd)) -        sock_read_line(my_sock, timeout=10) -        sys.stderr.write("Received initial newline.\n") -        result = {} -        while result.get('class', None) != 'TPV': -            json_result = sock_read_line(my_sock, timeout=60) -            sys.stderr.write( -                "Received JSON response: {}\n\n".format(json_result) -            ) -            result = json.loads(json_result) -        my_sock.sendall(b'?WATCH={"enable":false}') -        my_sock.close() +        # Now the chip is on, read back the TPV result +        result = bist.get_gpsd_tpv_result()          # If we reach this line, we have a valid result and the chip responded.          # However, it doesn't necessarily mean we had a GPS lock.          return True, result @@ -589,60 +248,9 @@ class E320BIST(object):              return True, {                  'tpm0_caps': "Fake caps value\n\nVersion 0.0.0",              } -        result = {} -        props_to_read = ('caps',) -        base_path = '/sys/class/tpm' -        for tpm_device in os.listdir(base_path): -            if tpm_device.startswith('tpm'): -                for key in props_to_read: -                    result['{}_{}'.format(tpm_device, key)] = open( -                        os.path.join(base_path, tpm_device, key), 'r' -                    ).read().strip() +        result = bist.get_tpm_caps_info()          return len(result) == 1, result -    def ref_clock_helper(self,clock_source): -        """ -        Helper function to determine reference clock lock -        Description: Checks to see if we can lock to a clock source. - -        External Equipment: None -        Return dictionary: -         - <sensor-name>: -           - locked: Boolean lock status -        """ -        assert clock_source in ("internal", "external"),\ -            "Invalid clock source selected ({}). Valid choices: {}".format( -                clock_source, ("internal", "external")) -        if self.args.dry_run: -            return True, {'ref_locked': True} -        result = {} -        env = os.environ.copy() -        env['UHD_LOG_CONSOLE_LEVEL'] = 'error' -        cmd = ['uhd_usrp_probe', '--args', 'addr=127.0.0.1,clock_source=' + clock_source, -            '--sensor'] -        sensor_path = '/mboards/0/sensors/ref_locked' -        cmd.append(sensor_path) -        ref_lock_executor = ' '.join(cmd) -        try: -            output = subprocess.check_output( -                ref_lock_executor, -                stderr=subprocess.STDOUT, -                env=env, -                shell=True, -            ) -        except subprocess.CalledProcessError as ex: -            # Don't throw errors from uhd_usrp_probe -            output = ex.output -        output = output.decode("utf-8") -        mobj = re.search(r"true$", output.strip()) -        if mobj is not None: -            result['ref_locked'] = True -        else: -            result['ref_locked'] = False -            result['error_msg'] = ("Reference Clock not locked." -                                   " Extra output:" + output) -        return result -      def bist_ref_clock_int(self):          """          BIST for clock lock from internal (20MHz). @@ -658,7 +266,9 @@ class E320BIST(object):          need to be asserted.          """          assert 'ref_clock_int' in self.tests_to_run -        result = self.ref_clock_helper('internal') +        if self.args.dry_run: +            return True, {'ref_locked': True} +        result = bist.get_ref_clock_prop('internal', 'internal')          return 'error_msg' not in result, result      def bist_ref_clock_ext(self): @@ -676,7 +286,9 @@ class E320BIST(object):          need to be asserted.          """          assert 'ref_clock_ext' in self.tests_to_run -        result = self.ref_clock_helper('external') +        if self.args.dry_run: +            return True, {'ref_locked': True} +        result = bist.get_ref_clock_prop('external', 'external')          return 'error_msg' not in result, result      def bist_sfp_loopback(self): @@ -696,10 +308,14 @@ class E320BIST(object):          - bits: Number of bits that were transferred          """          if self.args.dry_run: -            return True, get_sfp_bist_defaults() -        sfp_bist_results = run_aurora_bist(master='misc-auro-regs') +            return True, bist.get_sfp_bist_defaults() +        sfp_bist_results = bist.run_aurora_bist( +            device_args=self.device_args, +            product_id=self.get_product_id(), +            master='misc-auro-regs', +        )          self.reload_fpga_image = True -        return aurora_results_to_status(sfp_bist_results) +        return bist.aurora_results_to_status(sfp_bist_results)      def bist_gpio(self):          """ @@ -737,7 +353,7 @@ class E320BIST(object):              " Run a GPIO test for a given set of patterns "              gpio_ctrl = e320_periphs.FrontpanelGPIO(ddr)              for pattern in patterns: -                gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr) +                bist.gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr)                  time.sleep(0.1)                  gpio_rb = gpio_ctrl.get_all()                  if pattern != gpio_rb: @@ -763,16 +379,8 @@ class E320BIST(object):          assert 'temp' in self.tests_to_run          if self.args.dry_run:              return True, {'internal': 30000} -        import pyudev -        context = pyudev.Context() -        result = { -            temp_sensor_map[device.sys_name]: \ -                    int(device.attributes.get('temp').decode('ascii')) -            for device in context.list_devices(subsystem='thermal') - -            if 'temp' in device.attributes.available_attributes \ -                    and device.attributes.get('temp') is not None -        } +        result = bist.get_temp_sensor_value( +            lambda device: TEMP_SENSOR_MAP[device.sys_name])          if len(result) < 1:              result['error_msg'] = "No temperature sensors found!"          return 'error_msg' not in result, result @@ -790,14 +398,7 @@ class E320BIST(object):          assert 'fan' in self.tests_to_run          if self.args.dry_run:              return True, {'cooling_device0': 10000} -        import pyudev -        context = pyudev.Context() -        result = { -            device.sys_name: int(device.attributes.get('cur_state')) -            for device in context.list_devices(subsystem='thermal') -            if 'cur_state' in device.attributes.available_attributes \ -                    and device.attributes.get('cur_state') is not None -        } +        result = bist.get_fan_values()          return len(result) == 1, result      def bist_link_up(self): @@ -812,65 +413,9 @@ class E320BIST(object):          assert 'link_up' in self.tests_to_run          if self.args.dry_run:              return True, {'sfp0': 'UP'} -        from pyroute2 import IPRoute -        result = {} -        with IPRoute() as ipr: -            links = ipr.link_lookup(ifname='sfp0') -            if not links: -                return False, {'error_msg': "No interface found"} -            link_info = next(iter(ipr.get_links(links)), None) -            if link_info == None: -                return False, {'error_msg': "Error on get_links for sfp0"} -            result['sfp0'] = link_info.get_attr('IFLA_OPERSTATE') -            if result['sfp0'] != 'UP': -                result['error_msg'] = "Link not up for interface" +        result = bist.get_link_up('sfp0')          return 'error_msg' not in result, result - -def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask): -    """Helper function for set gpio. -    What this function do is take decimal value and convert to a binary string -    then try to set those individual bits to the gpio_bank. -    Arguments: -        gpio_bank  -- gpio bank type. -        value -- value to set onto gpio bank. -        gpio_size -- size of the gpio bank -        ddr_mask  -- data direction register bit mask. 0 is input; 1 is output. -    """ -    ddr_size = bin(ddr_mask).count("1") -    value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):] -    ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):] -    for i in range(gpio_size): -        if ddr_bitstring[gpio_size - 1 - i] == "1": -            gpio_bank.set(i, value_bitstring[i % ddr_size]) - -def get_product_id(): -    """Return the mboard product ID (e320):""" -    cmd = ['eeprom-id'] -    output = subprocess.check_output( -        cmd, -        stderr=subprocess.STDOUT, -        shell=True, -    ).decode('utf-8') -    if 'e320' in output: -        return 'e320' -    raise AssertionError("Cannot determine product ID.") - -def load_fpga_image(fpga_type): -    """Load an FPGA image (1G, XG, AA, ...)""" -    cmd = ['uhd_image_loader', '--args', 'type=e3xx,addr=127.0.0.1', '--fpga-path'] -    images_folder = '/usr/share/uhd/images/' -    fpga_file_name = \ -            'usrp_' + get_product_id() + '_fpga_' + fpga_type.upper() + '.bit' -    fpga_image = images_folder + fpga_file_name -    cmd.append(fpga_image) -    cmd_str = ' '.join(cmd) -    subprocess.check_output( -        cmd_str, -        stderr=subprocess.STDOUT, -        shell=True -    ) -  ##############################################################################  # main  ############################################################################## diff --git a/mpm/python/n3xx_bist b/mpm/python/n3xx_bist index 18f1c3b4b..cc7dd40c6 100755 --- a/mpm/python/n3xx_bist +++ b/mpm/python/n3xx_bist @@ -1,6 +1,6 @@  #!/usr/bin/env python3  # -# Copyright 2017 Ettus Research, National Instruments Company +# Copyright 2017-2018 Ettus Research, National Instruments Company  #  # SPDX-License-Identifier: GPL-3.0-or-later  # @@ -11,17 +11,9 @@ Will work on all derivatives of the N3xx series.  """  from __future__ import print_function -import os  import sys -import subprocess -import re -import socket -import select  import time -import json -from datetime import datetime -import argparse -from six import iteritems +from usrp_mpm import bist  # Timeout values are in seconds:  GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute" @@ -31,105 +23,20 @@ GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test                         # by default.  ############################################################################## -# Aurora/SFP BIST code +# Bist class  ############################################################################## -def get_sfp_bist_defaults(): -    " Default dictionary for SFP/Aurora BIST dry-runs " -    return { -        'elapsed_time': 1.0, -        'max_roundtrip_latency': 0.8e-6, -        'throughput': 1000e6, -        'max_ber': 8.5e-11, -        'errors': 0, -        'bits': 12012486656, -    } - -def assert_aurora_image(master, slave): -    """ -    Make sure we have an FPGA image with which we can run the requested tests. - -    Will load an AA image if not, which always satisfies all conditions for -    running Aurora tests. -    """ -    from usrp_mpm.sys_utils import uio -    if not uio.find_uio_device(master)[0] or \ -            (slave is not None and not uio.find_uio_device(slave)[0]): -        load_fpga_image('AA') - -def run_aurora_bist(master, slave=None): +class N3XXBIST(bist.UsrpBIST):      """ -    Spawn a BER test -    """ -    from usrp_mpm import aurora_control -    from usrp_mpm.sys_utils.uio import open_uio - -    class DummyContext(object): -        """Dummy class for context managers""" -        def __enter__(self): -            return - -        def __exit__(self, exc_type, exc_value, traceback): -            return exc_type is None - -    # Go, go, go! -    try: -        assert_aurora_image(master, slave) -        with open_uio(label=master, read_only=False) as master_au_uio: -            master_au_ctrl = aurora_control.AuroraControl(master_au_uio) -            with open_uio(label=slave, read_only=False)\ -                    if slave is not None else DummyContext() as slave_au_uio: -                slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\ -                    if slave is not None else None -                return master_au_ctrl.run_ber_loopback_bist( -                    duration=10, -                    requested_rate=1300 * 8e6, -                    slave=slave_au_ctrl, -                ) -    except Exception as ex: -        print("Unexpected exception: {}".format(str(ex))) -        exit(1) - - -def aurora_results_to_status(bist_results): -    """ -    Convert a dictionary coming from AuroraControl BIST to one that we can use -    for this BIST +    BIST Tool for the USRP N3xx series      """ -    return bist_results['mst_errors'] == 0, { -        'elapsed_time': bist_results['time_elapsed'], -        'max_roundtrip_latency': bist_results['mst_latency_us'], -        'throughput': bist_results['approx_throughput'], -        'max_ber': bist_results['max_ber'], -        'errors': bist_results['mst_errors'], -        'bits': bist_results['mst_samps'], +    usrp_type = "N3XX" +    # This defines special tests that are really collections of other tests. +    collections = { +        'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm"], +        'extended': "*",      } - -############################################################################## -# Helpers -############################################################################## -def post_results(results): -    """ -    Given a dictionary, post the results. - -    This will print the results as JSON to stdout. -    """ -    print(json.dumps( -        results, -        sort_keys=True, -        indent=4, -        separators=(',', ': ') -    )) - -def filter_results_for_lv(results): -    """ -    The LabView JSON parser does not support a variety of things, such as -    nested dicts, and some downstream LV applications freak out if certain keys -    are not what they expect. -    This is a long hard-coded list of how results should look like for those -    cases. Note: This list needs manual supervision and attention for the case -    where either subsystems get renamed, or other architectural changes should -    occur. -    """ +    # Default FPGA image type +    DEFAULT_FPGA_TYPE = 'HG'      lv_compat_format = {          'ddr3': {              'throughput': -1, @@ -184,240 +91,24 @@ def filter_results_for_lv(results):              'lock_status': 0,          },      } -    # OK now go and brush up the results: -    def fixup_dict(result_dict, ref_dict): -        """ -        Touches up result_dict according to ref_dict by the following rules: -        - If a key is in result_dict that is not in ref_dict, delete that -        - If a key is in ref_dict that is not in result_dict, use the value -          from ref_dict -        """ -        ref_dict['error_msg'] = "" -        ref_dict['status'] = False -        result_dict = { -            k: v for k, v in iteritems(result_dict) -            if k in ref_dict or k in ('error_msg', 'status') -        } -        result_dict = { -            k: result_dict.get(k, ref_dict[k]) for k in ref_dict -        } -        return result_dict -    results = { -        testname: fixup_dict(testresults, lv_compat_format[testname]) \ -                    if testname in lv_compat_format else testresults -        for testname, testresults in iteritems(results) -    } -    return results - -def sock_read_line(my_sock, timeout=60, interval=0.1): -    """ -    Read from a socket until newline. If there was no newline until the timeout -    occurs, raise an error. Otherwise, return the line. -    """ -    line = b'' -    end_time = time.time() + timeout -    while time.time() < end_time: -        socket_ready = select.select([my_sock], [], [], 0)[0] -        if socket_ready: -            next_char = my_sock.recv(1) -            if next_char == b'\n': -                return line.decode('ascii') -            line += next_char -        else: -            time.sleep(interval) -    raise RuntimeError("sock_read_line() exceeded read timeout!") - -def poll_with_timeout(state_check, timeout_ms, interval_ms): -    """ -    Calls state_check() every interval_ms until it returns a positive value, or -    until a timeout is exceeded. - -    Returns True if state_check() returned True within the timeout. -    """ -    max_time = time.time() + (float(timeout_ms) / 1000) -    interval_s = float(interval_ms) / 1000 -    while time.time() < max_time: -        if state_check(): -            return True -        time.sleep(interval_s) -    return False - -def expand_options(option_list): -    """ -    Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar', -    'spam': 'eggs'}. -    """ -    return dict(x.split('=') for x in option_list) - -############################################################################## -# Bist class -############################################################################## -class N3XXBIST(object): -    """ -    BIST Tool for the USRP N3xx series -    """ -    # This defines special tests that are really collections of other tests. -    collections = { -        'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm"], -        'extended': "*", -    } -    # Default FPGA image type -    DEFAULT_FPGA_TYPE = 'HG' - -    @staticmethod -    def make_arg_parser(): -        """ -        Return arg parser -        """ -        parser = argparse.ArgumentParser( -            description="N3xx BIST Tool", -        ) -        parser.add_argument( -            '-n', '--dry-run', action='store_true', -            help="Fake out the tests. All tests will return a valid" \ -                 " response, but will not actually interact with hardware.", -        ) -        parser.add_argument( -            '-v', '--verbose', action='store_true', -            help="Crank up verbosity level", -        ) -        parser.add_argument( -            '--debug', action='store_true', -            help="For debugging this tool.", -        ) -        parser.add_argument( -            '--option', '-o', action='append', default=[], -            help="Option for individual test.", -        ) -        parser.add_argument( -            '--lv-compat', action='store_true', -            help="Provides compatibility with the LV JSON parser. Don't run " -                 "this mode unless you know what you're doing. The JSON " -                 "output does not necessarily reflect the actual system " -                 "status when using this mode.", -        ) -        parser.add_argument( -            '--skip-fpga-reload', action='store_true', -            help="Skip reloading the default FPGA image post-test. Note: by" -                 "specifying this argument, the FPGA image loaded could be " -                 "anything post-test.", -        ) -        parser.add_argument( -            'tests', -            help="List the tests that should be run", -            nargs='+', # There has to be at least one -        ) -        return parser +    device_args = "type=n3xx,addr=127.0.0.1"      def __init__(self): -        self.args = N3XXBIST.make_arg_parser().parse_args() -        self.args.option = expand_options(self.args.option) -        # If this is true, trigger a reload of the default FPGA image -        self.reload_fpga_image = False -        try: -            from usrp_mpm.periph_manager.n3xx import n3xx -            default_rev = n3xx.mboard_max_rev -        except ImportError: -            # This means we're in dry run mode or something like that, so just -            # pick something -            default_rev = 3 -        self.mb_rev = int(self.args.option.get('mb_rev', default_rev)) -        self.tests_to_run = set() -        for test in self.args.tests: -            if test in self.collections: -                for test in self.expand_collection(test): -                    self.tests_to_run.add(test) -            else: -                self.tests_to_run.add(test) -        try: -            # Keep this import here so we can do dry-runs without any MPM code -            from usrp_mpm import get_main_logger -            if not self.args.verbose: -                from usrp_mpm.mpmlog import WARNING -                get_main_logger().setLevel(WARNING) -            self.log = get_main_logger().getChild('main') -        except ImportError: -            print("No logging capability available.") - -    def expand_collection(self, coll): -        """ -        Return names of tests in a collection -        """ -        tests = self.collections[coll] -        if tests == "*": -            tests = {x.replace('bist_', '') -                     for x in dir(self) -                     if x.find('bist_') == 0 -                    } -        else: -            tests = set(tests) -        return tests - -    def run(self): -        """ -        Execute tests. +        bist.UsrpBIST.__init__(self) -        Returns True on Success. -        """ -        def execute_test(testname): -            """ -            Actually run a test. -            """ -            testmethod_name = "bist_{0}".format(testname) -            sys.stderr.write( -                "Executing test method: {0}\n\n".format(testmethod_name) -            ) -            try: -                status, data = getattr(self, testmethod_name)() -                data['status'] = status -                data['error_msg'] = data.get('error_msg', '') -                return status, data -            except AttributeError: -                sys.stderr.write("Test not defined: {}\n".format(testname)) -                return False, {} -            except Exception as ex: -                sys.stderr.write( -                    "Test {} failed to execute: {}\n".format(testname, str(ex)) -                ) -                if self.args.debug: -                    raise -                return False, {'error_msg': str(ex)} -        tests_successful = True -        result = {} -        for test in self.tests_to_run: -            status, result_data = execute_test(test) -            tests_successful = tests_successful and status -            result[test] = result_data -        if self.args.lv_compat: -            result = filter_results_for_lv(result) -        post_results(result) -        if self.reload_fpga_image and not self.args.skip_fpga_reload: -            load_fpga_image(self.DEFAULT_FPGA_TYPE) -        return tests_successful +    def get_mb_periph_mgr(self): +        """Return reference to an n3xx periph manager""" +        from usrp_mpm.periph_manager.n3xx import n3xx +        return n3xx + +    def get_product_id(self): +        """Return the mboard product ID (n310 or n300):""" +        return bist.get_product_id_from_eeprom(valid_ids=['n300', 'n310'])  #############################################################################  # BISTS  # All bist_* methods must return True/False success values!  ############################################################################# -    def bist_rtc(self): -        """ -        BIST for RTC (real time clock) - -        Return dictionary: -        - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601 -                format, as a string. As if running 'date -Iseconds -u'. -        - time: Same time, but in seconds since epoch. - -        Return status: -        Unless datetime throws an exception, returns True. -        """ -        assert 'rtc' in self.tests_to_run -        utc_now = datetime.utcnow() -        return True, { -            'time': time.mktime(utc_now.timetuple()), -            'date': utc_now.replace(microsecond=0).isoformat() + "+00:00", -        } -      def bist_ddr3(self):          """          BIST for PL DDR3 DRAM @@ -437,25 +128,7 @@ class N3XXBIST(object):          assert 'ddr3' in self.tests_to_run          if self.args.dry_run:              return True, {'throughput': 1250e6} -        result = {} -        ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1' -        try: -            output = subprocess.check_output( -                ddr3_bist_executor, -                stderr=subprocess.STDOUT, -                shell=True, -            ) -        except subprocess.CalledProcessError as ex: -            # Don't throw errors from uhd_usrp_probe -            output = ex.output -        output = output.decode("utf-8") -        mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output) -        if mobj is not None: -            result['throughput'] = float(mobj.group('thrup')) * 1000 -        else: -            result['throughput'] = 0 -            result['error_msg'] = result.get('error_msg', '') + \ -                                        "\n\nFailed match throughput regex!" +        result = bist.test_ddr3_with_usrp_probe()          return result.get('throughput', 0) > 1000e3, result      def bist_gpsdo(self): @@ -501,7 +174,7 @@ class N3XXBIST(object):          sys.stderr.write(              "Waiting for WARMUP to go low for up to {} seconds...\n".format(                  gps_warmup_timeout)) -        if not poll_with_timeout( +        if not bist.poll_with_timeout(                  lambda: not gpio_tca6424.get('GPS-WARMUP'),                  gps_warmup_timeout*1000, 1000              ): @@ -513,7 +186,7 @@ class N3XXBIST(object):          sys.stderr.write(              "Waiting for LOCKOK to go high for up to {} seconds...\n".format(                  gps_lockok_timeout)) -        if not poll_with_timeout( +        if not bist.poll_with_timeout(                  lambda: gpio_tca6424.get('GPS-LOCKOK'),                  gps_lockok_timeout*1000,                  1000 @@ -528,24 +201,8 @@ class N3XXBIST(object):          sys.stderr.write("GPS-ALARM status: {}\n".format(              gpio_tca6424.get('GPS-ALARM')          )) -        # Now read back response from chip -        my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -        my_sock.connect(('localhost', 2947)) -        sys.stderr.write("Connected to GPSDO socket.\n") -        query_cmd = b'?WATCH={"enable":true,"json":true}' -        my_sock.sendall(query_cmd) -        sys.stderr.write("Sent query: {}\n".format(query_cmd)) -        sock_read_line(my_sock, timeout=10) -        sys.stderr.write("Received initial newline.\n") -        result = {} -        while result.get('class', None) != 'TPV': -            json_result = sock_read_line(my_sock, timeout=60) -            sys.stderr.write( -                "Received JSON response: {}\n\n".format(json_result) -            ) -            result = json.loads(json_result) -        my_sock.sendall(b'?WATCH={"enable":false}') -        my_sock.close() +        # Now the chip is on, read back the TPV result +        result = bist.get_gpsd_tpv_result()          # If we reach this line, we have a valid result and the chip responded.          # However, it doesn't necessarily mean we had a GPS lock.          return True, result @@ -567,17 +224,87 @@ class N3XXBIST(object):              return True, {                  'tpm0_caps': "Fake caps value\n\nVersion 0.0.0",              } -        result = {} -        props_to_read = ('caps',) -        base_path = '/sys/class/tpm' -        for tpm_device in os.listdir(base_path): -            if tpm_device.startswith('tpm'): -                for key in props_to_read: -                    result['{}_{}'.format(tpm_device, key)] = open( -                        os.path.join(base_path, tpm_device, key), 'r' -                    ).read().strip() +        result = bist.get_tpm_caps_info()          return len(result) == 1, result +    def bist_ref_clock_int(self): +        """ +        BIST for clock lock from internal (25 MHz) source. +        Description: Checks to see if the daughtercard can lock to an internal +        clock source. + +        External Equipment: None +        Return dictionary: +        - <sensor-name>: +          - locked: Boolean lock status + +        There can be multiple ref lock sensors; for a pass condition they all +        need to be asserted. +        """ +        assert 'ref_clock_int' in self.tests_to_run +        if self.args.dry_run: +            return True, {'ref_locked': True} +        result = bist.get_ref_clock_prop( +            'internal', +            'internal', +            extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0} +        ) +        return 'error_msg' not in result, result + +    def bist_ref_clock_ext(self): +        """ +        BIST for clock lock from external source. Note: This test requires a +        connected daughterboard with a 'ref lock' sensor available. + +        Description: Checks to see if the daughtercard can lock to the external +        reference clock. + +        External Equipment: 10 MHz reference Source connected to "ref in". + +        Return dictionary: +        - <sensor-name>: +          - locked: Boolean lock status + +        There can be multiple ref lock sensors; for a pass condition they all +        need to be asserted. +        """ +        assert 'ref_clock_ext' in self.tests_to_run +        if self.args.dry_run: +            return True, {'ref_locked': True} +        result = bist.get_ref_clock_prop( +            'external', +            'external', +            extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0} +        ) +        return 'error_msg' not in result, result + +    def bist_ref_clock_gpsdo(self): +        """ +        BIST for clock lock from external source. Note: This test requires a +        connected daughterboard with a 'ref lock' sensor available. + +        Description: Checks to see if the daughtercard can lock to the external +        reference clock. + +        External Equipment: 10 MHz reference Source connected to "ref in". + +        Return dictionary: +        - <sensor-name>: +          - locked: Boolean lock status + +        There can be multiple ref lock sensors; for a pass condition they all +        need to be asserted. +        """ +        assert 'ref_clock_gpsdo' in self.tests_to_run +        if self.args.dry_run: +            return True, {'ref_locked': True} +        result = bist.get_ref_clock_prop( +            'gpsdo', +            'gpsdo', +            extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0} +        ) +        return 'error_msg' not in result, result +      def bist_sfp0_loopback(self):          """          BIST for SFP+ ports: @@ -596,10 +323,14 @@ class N3XXBIST(object):          - bits: Number of bits that were transferred          """          if self.args.dry_run: -            return True, get_sfp_bist_defaults() -        sfp_bist_results = run_aurora_bist(master='misc-auro-regs0') +            return True, bist.get_sfp_bist_defaults() +        sfp_bist_results = bist.run_aurora_bist( +            device_args=self.device_args, +            product_id=self.get_product_id(), +            master='misc-auro-regs0', +        )          self.reload_fpga_image = True -        return aurora_results_to_status(sfp_bist_results) +        return bist.aurora_results_to_status(sfp_bist_results)      def bist_sfp1_loopback(self):          """ @@ -619,10 +350,14 @@ class N3XXBIST(object):          - bits: Number of bits that were transferred          """          if self.args.dry_run: -            return True, get_sfp_bist_defaults() -        sfp_bist_results = run_aurora_bist(master='misc-auro-regs1') +            return True, bist.get_sfp_bist_defaults() +        sfp_bist_results = bist.run_aurora_bist( +            device_args=self.device_args, +            product_id=self.get_product_id(), +            master='misc-auro-regs1', +        )          self.reload_fpga_image = True -        return aurora_results_to_status(sfp_bist_results) +        return bist.aurora_results_to_status(sfp_bist_results)      def bist_sfp_loopback(self):          """ @@ -642,13 +377,15 @@ class N3XXBIST(object):          - bits: Number of bits that were transferred          """          if self.args.dry_run: -            return True, get_sfp_bist_defaults() -        sfp_bist_results = run_aurora_bist( +            return True, bist.get_sfp_bist_defaults() +        sfp_bist_results = bist.run_aurora_bist( +            device_args=self.device_args, +            product_id=self.get_product_id(),              master='misc-auro-regs0',              slave='misc-auro-regs1',          )          self.reload_fpga_image = True -        return aurora_results_to_status(sfp_bist_results) +        return bist.aurora_results_to_status(sfp_bist_results)      def bist_gpio(self):          """ @@ -690,7 +427,7 @@ class N3XXBIST(object):              " Run a GPIO test for a given set of patterns "              gpio_ctrl = n3xx_periphs.FrontpanelGPIO(ddr)              for pattern in patterns: -                gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr) +                bist.gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr)                  time.sleep(0.1)                  gpio_rb = gpio_ctrl.get_all()                  if  pattern != gpio_rb: @@ -716,15 +453,8 @@ class N3XXBIST(object):          assert 'temp' in self.tests_to_run          if self.args.dry_run:              return True, {'fpga-thermal-zone': 30000} -        import pyudev -        context = pyudev.Context() -        result = { -            device.attributes.get('type').decode('ascii'): \ -                    int(device.attributes.get('temp').decode('ascii')) -            for device in context.list_devices(subsystem='thermal') -            if 'temp' in device.attributes.available_attributes \ -                    and device.attributes.get('temp') is not None -        } +        result = bist.get_temp_sensor_value( +            lambda device: device.attributes.get('type').decode('ascii'))          if len(result) < 1:              result['error_msg'] = "No temperature sensors found!"          return 'error_msg' not in result, result @@ -742,14 +472,7 @@ class N3XXBIST(object):          assert 'fan' in self.tests_to_run          if self.args.dry_run:              return True, {'cooling_device0': 10000, 'cooling_device1': 10000} -        import pyudev -        context = pyudev.Context() -        result = { -            device.sys_name: int(device.attributes.get('cur_state')) -            for device in context.list_devices(subsystem='thermal') -            if 'cur_state' in device.attributes.available_attributes \ -                    and device.attributes.get('cur_state') is not None -        } +        result = bist.get_fan_values()          return len(result) == 2, result      def bist_whiterabbit(self): @@ -768,7 +491,11 @@ class N3XXBIST(object):          from usrp_mpm.sys_utils import uio          if not uio.find_uio_device(n3xx.wr_regs_label, logger=self.log)[0]:              self.log.info("Need to load WX image before proceeding...") -            load_fpga_image('WX') +            bist.load_fpga_image( +                'WX', +                self.device_args, +                self.get_product_id(), +            )              self.log.info("Image loading complete.")          self.reload_fpga_image = True          mb_regs = n3xx_periphs.MboardRegsControl( @@ -776,7 +503,7 @@ class N3XXBIST(object):          mb_regs.set_time_source('sfp0', 25e6)          wr_regs_control = WhiteRabbitRegsControl(              n3xx.wr_regs_label, self.log) -        lock_status = poll_with_timeout( +        lock_status = bist.poll_with_timeout(              lambda: wr_regs_control.get_time_lock_status(),              40000, # Try for x ms... this number is set from a few benchtop tests              1000, # Poll every... second! why not? @@ -786,160 +513,6 @@ class N3XXBIST(object):          }          return lock_status, result -    def bist_ref_clock_int(self): -        """ -        BIST for clock and pps lock from internal (25MHz). -        Description: Checks to see if we can lock to an internal -        clock source. - -        External Equipment: None -        Return dictionary: -        - <sensor-name>: -          - locked: Boolean lock status - -        There can be multiple ref lock sensors; for a pass condition they all -        need to be asserted. -        """ -        assert 'ref_clock_int' in self.tests_to_run -        result = self.ref_clock_helper('internal', 'internal') -        return 'error_msg' not in result, result - -    def bist_ref_clock_ext(self): -        """ -        BIST for clock pps lock from external (10MHz) source. -        Description: Checks to see if we can lock to an external -        clock source. - -        External Equipment: External 10 MHz reference clock needed (from Octoclock) -        Return dictionary: -        - <sensor-name>: -          - locked: Boolean lock status - -        There can be multiple ref lock sensors; for a pass condition they all -        need to be asserted. -        """ -        assert 'ref_clock_ext' in self.tests_to_run -        result = self.ref_clock_helper('external', 'external') -        return 'error_msg' not in result, result - -    def bist_ref_clock_gpsdo(self): -        """ -        BIST for clock and pps lock from gpsdo (20MHz) source. -        Description: Checks to see if we can lock to an external -        clock source. - -        External Equipment: None -        Return dictionary: -        - <sensor-name>: -          - locked: Boolean lock status - -        There can be multiple ref lock sensors; for a pass condition they all -        need to be asserted. -        """ -        assert 'ref_clock_gpsdo' in self.tests_to_run -        result = self.ref_clock_helper('gpsdo', 'gpsdo') -        return 'error_msg' not in result, result - -    def ref_clock_helper(self, clock_source, time_source): -        """ -        Helper function to determine reference clock lock -        Description: Checks to see if we can lock to a clock source. -        External Equipment: None -        Return dictionary: -         - <sensor-name>: -           - locked: Boolean lock status -        """ -        assert clock_source in ("gpsdo", "internal", "external"),\ -            "Invalid clock source selected ({}). Valid choices: {}".format( -                clock_source, ("gpsdo", "internal", "external")) - -        assert time_source in ("gpsdo", "internal", "external", "sfp0"),\ -            "Invalid time source selected ({}). Valid choices: {}".format( -                time_source, ("gpsdo", "internal", "external", "sfp0")) - -        if self.args.dry_run: -            return True, {'ref_locked': True} -        result = {} -        env = os.environ.copy() -        env['UHD_LOG_CONSOLE_LEVEL'] = 'error' -        cmd = ['uhd_usrp_probe', '--args', -               'addr=127.0.0.1,' -               'rfnoc_num_blocks=0,' -               'skip_rfic=1,' -               'clock_source={c},time_source={t}'.format(c=clock_source, -                                                         t=time_source), -               '--sensor'] -        sensor_path = '/mboards/0/sensors/ref_locked' -        cmd.append(sensor_path) -        ref_lock_executor = ' '.join(cmd) -        try: -            output = subprocess.check_output( -                ref_lock_executor, -                stderr=subprocess.STDOUT, -                env=env, -                shell=True, -            ) -        except subprocess.CalledProcessError as ex: -            # Don't throw errors from uhd_usrp_probe -            output = ex.output -        output = output.decode("utf-8") -        mobj = re.search(r"true$", output.strip()) -        if mobj is not None: -            result['ref_locked'] = True -        else: -            result['ref_locked'] = False -            result['error_msg'] = ("Reference Clock not locked." -                                   " Extra output:" + output) -        return result - - - -def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask): -    """Helper function for set gpio. -    What this function do is take decimal value and convert to a binary string -    then try to set those individual bits to the gpio_bank. -    Arguments: -        gpio_bank  -- gpio bank type. -        value -- value to set onto gpio bank. -        gpio_size -- size of the gpio bank -        ddr_mask  -- data direction register bit mask. 0 is input; 1 is output. -    """ -    ddr_size = bin(ddr_mask).count("1") -    value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):] -    ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):] -    for i in range(gpio_size): -        if ddr_bitstring[gpio_size - 1 - i] == "1": -            gpio_bank.set(i, value_bitstring[i % ddr_size]) - -def get_product_id(): -    """Return the mboard product ID (n310 or n300):""" -    cmd = ['eeprom-id'] -    output = subprocess.check_output( -        cmd, -        stderr=subprocess.STDOUT, -        shell=True, -    ).decode('utf-8') -    if 'n310' in output: -        return 'n310' -    elif 'n300' in output: -        return 'n300' -    raise AssertionError("Cannot determine product ID.") - -def load_fpga_image(fpga_type): -    """Load an FPGA image (HG, XG, AA, ...)""" -    cmd = ['uhd_image_loader', '--args', 'type=n3xx', '--fpga-path'] -    images_folder = '/usr/share/uhd/images/' -    fpga_file_name = \ -            'usrp_' + get_product_id() + '_fpga_' + fpga_type.upper() + '.bit' -    fpga_image = images_folder + fpga_file_name -    cmd.append(fpga_image) -    cmd_str = ' '.join(cmd) -    subprocess.check_output( -        cmd_str, -        stderr=subprocess.STDOUT, -        shell=True -    ) -  ##############################################################################  # main  ############################################################################## diff --git a/mpm/python/usrp_mpm/bist.py b/mpm/python/usrp_mpm/bist.py new file mode 100644 index 000000000..b16fefa35 --- /dev/null +++ b/mpm/python/usrp_mpm/bist.py @@ -0,0 +1,598 @@ +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Utilities to write BIST executables for USRPs +""" + +import os +import re +import sys +import time +import json +import select +import socket +from datetime import datetime +import argparse +import subprocess +from six import iteritems + +############################################################################## +# Aurora/SFP BIST code +############################################################################## +def get_sfp_bist_defaults(): +    " Default dictionary for SFP/Aurora BIST dry-runs " +    return { +        'elapsed_time': 1.0, +        'max_roundtrip_latency': 0.8e-6, +        'throughput': 1000e6, +        'max_ber': 8.5e-11, +        'errors': 0, +        'bits': 12012486656, +    } + +def assert_aurora_image(master, slave, device_args, product_id, aurora_image_type='AA'): +    """ +    Make sure we have an FPGA image with which we can run the requested tests. + +    Will load an AA image if not, which always satisfies all conditions for +    running Aurora tests. +    """ +    from usrp_mpm.sys_utils import uio +    if not uio.find_uio_device(master)[0] or \ +            (slave is not None and not uio.find_uio_device(slave)[0]): +        load_fpga_image( +            fpga_type=aurora_image_type, +            device_args=device_args, +            product_id=product_id, +        ) + +def aurora_results_to_status(bist_results): +    """ +    Convert a dictionary coming from AuroraControl BIST to one that we can use +    for this BIST +    """ +    return bist_results['mst_errors'] == 0, { +        'elapsed_time': bist_results['time_elapsed'], +        'max_roundtrip_latency': bist_results['mst_latency_us'], +        'throughput': bist_results['approx_throughput'], +        'max_ber': bist_results['max_ber'], +        'errors': bist_results['mst_errors'], +        'bits': bist_results['mst_samps'], +    } + +def run_aurora_bist( +        device_args, +        product_id, +        master, +        slave=None, +        requested_rate=1300*8e6, +        aurora_image_type='AA'): +    """ +    Spawn a BER test +    """ +    from usrp_mpm import aurora_control +    from usrp_mpm.sys_utils.uio import open_uio + +    class DummyContext(object): +        """Dummy class for context managers""" +        def __enter__(self): +            return + +        def __exit__(self, exc_type, exc_value, traceback): +            return exc_type is None + +    # Go, go, go! +    try: +        assert_aurora_image(master, slave, device_args, product_id, aurora_image_type) +        with open_uio(label=master, read_only=False) as master_au_uio: +            master_au_ctrl = aurora_control.AuroraControl(master_au_uio) +            with open_uio(label=slave, read_only=False)\ +                    if slave is not None else DummyContext() as slave_au_uio: +                slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\ +                    if slave is not None else None +                return master_au_ctrl.run_ber_loopback_bist( +                    duration=10, +                    requested_rate=requested_rate, +                    slave=slave_au_ctrl, +                ) +    except Exception as ex: +        print("Unexpected exception: {}".format(str(ex))) +        exit(1) + + +############################################################################## +# Helpers +############################################################################## +def post_results(results): +    """ +    Given a dictionary, post the results. + +    This will print the results as JSON to stdout. +    """ +    print(json.dumps( +        results, +        sort_keys=True, +        indent=4, +        separators=(',', ': ') +    )) + +def sock_read_line(my_sock, timeout=60, interval=0.1): +    """ +    Read from a socket until newline. If there was no newline until the timeout +    occurs, raise an error. Otherwise, return the line. +    """ +    line = b'' +    end_time = time.time() + timeout +    while time.time() < end_time: +        socket_ready = select.select([my_sock], [], [], 0)[0] +        if socket_ready: +            next_char = my_sock.recv(1) +            if next_char == b'\n': +                return line.decode('ascii') +            line += next_char +        else: +            time.sleep(interval) +    raise RuntimeError("sock_read_line() exceeded read timeout!") + +def poll_with_timeout(state_check, timeout_ms, interval_ms): +    """ +    Calls state_check() every interval_ms until it returns a positive value, or +    until a timeout is exceeded. + +    Returns True if state_check() returned True within the timeout. +    """ +    max_time = time.time() + (float(timeout_ms) / 1000) +    interval_s = float(interval_ms) / 1000 +    while time.time() < max_time: +        if state_check(): +            return True +        time.sleep(interval_s) +    return False + +def expand_options(option_list): +    """ +    Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar', +    'spam': 'eggs'}. +    """ +    return dict(x.split('=') for x in option_list) + +def load_fpga_image( +        fpga_type, +        device_args, +        product_id, +        images_folder='/usr/share/uhd/images/'): +    """Load an FPGA image (1G, XG, AA, ...)""" +    # cmd = ['uhd_image_loader', '--args', 'type=e3xx,addr=127.0.0.1', '--fpga-path'] +    fpga_file_name = \ +        'usrp_' + product_id + '_fpga_' + fpga_type.upper() + '.bit' +    fpga_image = images_folder + fpga_file_name +    cmd = [ +        'uhd_image_loader', +        '--args', device_args, +        '--fpga-path', fpga_image +    ] +    cmd_str = ' '.join(cmd) +    subprocess.check_output( +        cmd_str, +        stderr=subprocess.STDOUT, +        shell=True +    ) + +def filter_results_for_lv(results, lv_compat_format): +    """ +    The LabView JSON parser does not support a variety of things, such as +    nested dicts, and some downstream LV applications freak out if certain keys +    are not what they expect. +    This is a long hard-coded list of how results should look like for those +    cases. Note: This list needs manual supervision and attention for the case +    where either subsystems get renamed, or other architectural changes should +    occur. +    """ +    def fixup_dict(result_dict, ref_dict): +        """ +        Touches up result_dict according to ref_dict by the following rules: +        - If a key is in result_dict that is not in ref_dict, delete that +        - If a key is in ref_dict that is not in result_dict, use the value +          from ref_dict +        """ +        ref_dict['error_msg'] = "" +        ref_dict['status'] = False +        result_dict = { +            k: v for k, v in iteritems(result_dict) +            if k in ref_dict or k in ('error_msg', 'status') +        } +        result_dict = { +            k: result_dict.get(k, ref_dict[k]) for k in ref_dict +        } +        return result_dict +    # GoGoGo +    results = { +        testname: fixup_dict(testresults, lv_compat_format[testname]) \ +                    if testname in lv_compat_format else testresults +        for testname, testresults in iteritems(results) +    } +    return results + +def get_product_id_from_eeprom(valid_ids): +    """Return the mboard product ID + +    Returns something like n300, n310, e320... +    """ +    cmd = ['eeprom-id'] +    output = subprocess.check_output( +        cmd, +        stderr=subprocess.STDOUT, +        shell=True, +    ).decode('utf-8') +    for valid_id in valid_ids: +        if valid_id in output: +            return valid_id +    raise AssertionError("Cannot determine product ID.: `{}'".format(output)) + +def get_tpm_caps_info(): +    """Read 'caps' info from TPM subsystem""" +    result = {} +    props_to_read = ('caps',) +    base_path = '/sys/class/tpm' +    for tpm_device in os.listdir(base_path): +        if tpm_device.startswith('tpm'): +            for key in props_to_read: +                result['{}_{}'.format(tpm_device, key)] = open( +                    os.path.join(base_path, tpm_device, key), 'r' +                ).read().strip() +    return result + +def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask): +    """Helper function for set gpio. +    What this function do is take decimal value and convert to a binary string +    then try to set those individual bits to the gpio_bank. +    Arguments: +        gpio_bank  -- gpio bank type. +        value -- value to set onto gpio bank. +        gpio_size -- size of the gpio bank +        ddr_mask  -- data direction register bit mask. 0 is input; 1 is output. +    """ +    ddr_size = bin(ddr_mask).count("1") +    value_bitstring = \ +        ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):] +    ddr_bitstring = \ +        ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):] +    for i in range(gpio_size): +        if ddr_bitstring[gpio_size - 1 - i] == "1": +            gpio_bank.set(i, value_bitstring[i % ddr_size]) + +############################################################################## +# Common tests +############################################################################## +def test_ddr3_with_usrp_probe(): +    """ +    Run uhd_usrp_probe and scrape the output to see if the DRAM FIFO block is +    reporting a good throughput. This is a bit of a roundabout way of testing +    the DDR3, but it uses existing software and also tests the RFNoC pathways. +    """ +    result = {} +    ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1' +    try: +        output = subprocess.check_output( +            ddr3_bist_executor, +            stderr=subprocess.STDOUT, +            shell=True, +        ) +    except subprocess.CalledProcessError as ex: +        # Don't throw errors from uhd_usrp_probe +        output = ex.output +    output = output.decode("utf-8") +    mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output) +    if mobj is not None: +        result['throughput'] = float(mobj.group('thrup')) * 1000 +    else: +        result['throughput'] = 0 +        result['error_msg'] = result.get('error_msg', '') + \ +                                    "\n\nFailed match throughput regex!" +    return result + + +def get_gpsd_tpv_result(): +    """ +    Query gpsd via a socket and return the corresponding JSON result as a +    dictionary. +    """ +    my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +    my_sock.connect(('localhost', 2947)) +    sys.stderr.write("Connected to GPSDO socket.\n") +    query_cmd = b'?WATCH={"enable":true,"json":true}' +    my_sock.sendall(query_cmd) +    sys.stderr.write("Sent query: {}\n".format(query_cmd)) +    sock_read_line(my_sock, timeout=10) +    sys.stderr.write("Received initial newline.\n") +    result = {} +    while result.get('class', None) != 'TPV': +        json_result = sock_read_line(my_sock, timeout=60) +        sys.stderr.write( +            "Received JSON response: {}\n\n".format(json_result) +        ) +        result = json.loads(json_result) +    my_sock.sendall(b'?WATCH={"enable":false}') +    my_sock.close() +    return result + +def get_ref_clock_prop(clock_source, time_source, extra_args=None): +    """ +    Helper function to determine reference clock lock +    Description: Checks to see if we can lock to a clock source. +    The actual value is yanked from the property tree. + +    External Equipment: None +    Return dictionary: +     - <sensor-name>: +       - locked: Boolean lock status +    """ +    extra_args = extra_args or {} +    result = {} +    extra_args_str = ",".join( +        ['{k}={v}'.format(k=k, v=v) for k, v in iteritems(extra_args)]) +    cmd = [ +        'uhd_usrp_probe', +        '--args', +        'addr=127.0.0.1,clock_source={c},time_source={t},{e}'.format( +            c=clock_source, t=time_source, e=extra_args_str), +        '--sensor' +    ] +    sensor_path = '/mboards/0/sensors/ref_locked' +    cmd.append(sensor_path) +    ref_lock_executor = ' '.join(cmd) +    try: +        output = subprocess.check_output( +            ref_lock_executor, +            stderr=subprocess.PIPE, +            shell=True, +        ) +    except subprocess.CalledProcessError as ex: +        # Don't throw errors from uhd_usrp_probe +        output = ex.output +    output = output.decode("utf-8").strip() +    mobj = re.search(r"true$", output) +    if mobj is not None: +        result['ref_locked'] = True +    else: +        result['ref_locked'] = False +        result['error_msg'] = "Reference Clock not locked: " + output +    return result + +def get_temp_sensor_value(temp_sensor_map): +    """ +    Read a temp sensor value from the system and return a dictionary of the +    form {temp_sensor_lookup(device): $temp} +    """ +    import pyudev +    context = pyudev.Context() +    return { +        temp_sensor_map(device): \ +                int(device.attributes.get('temp').decode('ascii')) +        for device in context.list_devices(subsystem='thermal') +        if 'temp' in device.attributes.available_attributes \ +                and device.attributes.get('temp') is not None +    } + +def get_fan_values(): +    """ +    Return a dict of fan name -> fan speed key/values. +    """ +    import pyudev +    context = pyudev.Context() +    return { +        device.sys_name: int(device.attributes.get('cur_state')) +        for device in context.list_devices(subsystem='thermal') +        if 'cur_state' in device.attributes.available_attributes \ +                and device.attributes.get('cur_state') is not None +    } + +def get_link_up(if_name): +    """ +    Return a dictionary {if_name: IFLA_OPERSTATE} +    """ +    from pyroute2 import IPRoute +    result = {} +    with IPRoute() as ipr: +        links = ipr.link_lookup(ifname=if_name) +        if not links: +            return {'error_msg': "No interface found"} +        link_info = next(iter(ipr.get_links(links)), None) +        if link_info is None: +            return {'error_msg': "Error on get_links for sfp0"} +        result['sfp0'] = link_info.get_attr('IFLA_OPERSTATE') +        if result['sfp0'] != 'UP': +            result['error_msg'] = "Link not up for interface" +    return result + + + +############################################################################## +# BIST class +############################################################################## +class UsrpBIST(object): +    """ +    BIST parent class +    """ +    usrp_type = None +    default_rev = 3 # Because why not +    # This defines special tests that are really collections of other tests. +    collections = { +        'standard': ["rtc",], +        'extended': "*", +    } +    # Default FPGA image type +    DEFAULT_FPGA_TYPE = None +    lv_compat_format = None +    device_args = 'addr=127.0.0.1' + +    def make_arg_parser(self): +        """ +        Return arg parser +        """ +        parser = argparse.ArgumentParser( +            description="{} BIST Tool".format(self.usrp_type), +        ) +        parser.add_argument( +            '-n', '--dry-run', action='store_true', +            help="Fake out the tests. All tests will return a valid" \ +                 " response, but will not actually interact with hardware.", +        ) +        parser.add_argument( +            '-v', '--verbose', action='store_true', +            help="Crank up verbosity level", +        ) +        parser.add_argument( +            '--debug', action='store_true', +            help="For debugging this tool.", +        ) +        parser.add_argument( +            '--option', '-o', action='append', default=[], +            help="Option for individual test.", +        ) +        parser.add_argument( +            '--lv-compat', action='store_true', +            help="Provides compatibility with the LV JSON parser. Don't run " +                 "this mode unless you know what you're doing. The JSON " +                 "output does not necessarily reflect the actual system " +                 "status when using this mode.", +        ) +        parser.add_argument( +            '--skip-fpga-reload', action='store_true', +            help="Skip reloading the default FPGA image post-test. Note: by" +                 "specifying this argument, the FPGA image loaded could be " +                 "anything post-test.", +        ) +        parser.add_argument( +            'tests', +            help="List the tests that should be run", +            nargs='+', # There has to be at least one +        ) +        return parser + +    def get_mb_periph_mgr(self): +        """Needs to be implemented by child class""" +        raise NotImplementedError + +    def get_product_id(self): +        """Needs to be implemented by child class""" +        raise NotImplementedError + +    def __init__(self): +        assert self.DEFAULT_FPGA_TYPE is not None +        assert self.device_args is not None +        assert self.usrp_type is not None +        assert self.lv_compat_format is not None +        self.args = self.make_arg_parser().parse_args() +        self.args.option = expand_options(self.args.option) +        # If this is true, trigger a reload of the default FPGA image +        self.reload_fpga_image = False +        try: +            default_rev = self.get_mb_periph_mgr().mboard_max_rev +        except ImportError: +            # This means we're in dry run mode or something like that, so just +            # pick something +            default_rev = self.default_rev +        self.mb_rev = int(self.args.option.get('mb_rev', default_rev)) +        self.tests_to_run = set() +        for test in self.args.tests: +            if test in self.collections: +                for this_test in self.expand_collection(test): +                    self.tests_to_run.add(this_test) +            else: +                self.tests_to_run.add(test) +        try: +            # Keep this import here so we can do dry-runs without any MPM code +            from usrp_mpm import get_main_logger +            if not self.args.verbose: +                from usrp_mpm.mpmlog import WARNING +                get_main_logger().setLevel(WARNING) +            self.log = get_main_logger().getChild('main') +        except ImportError: +            print("No logging capability available.") + +    def expand_collection(self, coll): +        """ +        Return names of tests in a collection +        """ +        tests = self.collections[coll] +        if tests == "*": +            tests = {x.replace('bist_', '') +                     for x in dir(self) +                     if x.find('bist_') == 0 +                    } +        else: +            tests = set(tests) +        return tests + +    def run(self): +        """ +        Execute tests. + +        Returns True on Success. +        """ +        def execute_test(testname): +            """ +            Actually run a test. +            """ +            testmethod_name = "bist_{0}".format(testname) +            sys.stderr.write( +                "Executing test method: {0}\n\n".format(testmethod_name) +            ) +            if not hasattr(self, testmethod_name): +                sys.stderr.write("Test not defined: `{}`\n".format(testname)) +                return False, {} +            try: +                status, data = getattr(self, testmethod_name)() +                data['status'] = status +                data['error_msg'] = data.get('error_msg', '') +                return status, data +            except Exception as ex: +                sys.stderr.write( +                    "Test {} failed to execute: {}\n".format(testname, str(ex)) +                ) +                if self.args.debug: +                    raise +                return False, {'error_msg': str(ex)} +        tests_successful = True +        result = {} +        for test in self.tests_to_run: +            status, result_data = execute_test(test) +            tests_successful = tests_successful and status +            result[test] = result_data +        if self.args.lv_compat: +            result = filter_results_for_lv(result, self.lv_compat_format) +        post_results(result) +        if self.reload_fpga_image and not self.args.skip_fpga_reload: +            load_fpga_image( +                self.DEFAULT_FPGA_TYPE, +                self.device_args, +                self.get_product_id(), +            ) +        return tests_successful + +############################################################################# +# BISTS +# All bist_* methods must return True/False success values! +############################################################################# +    def bist_rtc(self): +        """ +        BIST for RTC (real time clock) + +        Return dictionary: +        - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601 +                format, as a string. As if running 'date -Iseconds -u'. +        - time: Same time, but in seconds since epoch. + +        Return status: +        Unless datetime throws an exception, returns True. +        """ +        assert 'rtc' in self.tests_to_run +        utc_now = datetime.utcnow() +        return True, { +            'time': time.mktime(utc_now.timetuple()), +            'date': utc_now.replace(microsecond=0).isoformat() + "+00:00", +        } | 
