From bf353515b6f26fa280f6a8d3fed477cc97e4a7ec Mon Sep 17 00:00:00 2001 From: Martin Braun Date: Fri, 12 Oct 2018 17:51:54 -0700 Subject: mpm: e320: n3xx: Factor BIST code to common module --- mpm/python/e320_bist | 561 ++++------------------------------- mpm/python/n3xx_bist | 697 +++++++++----------------------------------- mpm/python/usrp_mpm/bist.py | 598 +++++++++++++++++++++++++++++++++++++ 3 files changed, 786 insertions(+), 1070 deletions(-) create mode 100644 mpm/python/usrp_mpm/bist.py (limited to 'mpm') diff --git a/mpm/python/e320_bist b/mpm/python/e320_bist index ef7c85980..ad5b2163a 100755 --- a/mpm/python/e320_bist +++ b/mpm/python/e320_bist @@ -6,21 +6,12 @@ # """ E320 Built-In Self Test (BIST) - """ from __future__ import print_function -import os import sys -import subprocess -import re -import socket -import select import time -import json -from datetime import datetime -import argparse -from six import iteritems +from usrp_mpm import bist # Timeout values are in seconds: GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute" @@ -29,113 +20,30 @@ GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test # reduce this value in order for the BIST to pass faster # by default. # Temperature Sensor Mapping based on location -temp_sensor_map = { +TEMP_SENSOR_MAP = { "thermal_zone0" : "internal", "thermal_zone1" : "rf_channelA", "thermal_zone2" : "fpga", "thermal_zone3" : "rf_channelB", "thermal_zone4" : "main_power" } -############################################################################## -# Aurora/SFP BIST code -############################################################################## -def get_sfp_bist_defaults(): - " Default dictionary for SFP/Aurora BIST dry-runs " - return { - 'elapsed_time': 1.0, - 'max_roundtrip_latency': 0.8e-6, - 'throughput': 1000e6, - 'max_ber': 8.5e-11, - 'errors': 0, - 'bits': 12012486656, - } - -def assert_aurora_image(master, slave): - """ - Make sure we have an FPGA image with which we can run the requested tests. - - Will load an AA image if not, which always satisfies all conditions for - running Aurora tests. - """ - from usrp_mpm.sys_utils import uio - if not uio.find_uio_device(master)[0] or \ - (slave is not None and not uio.find_uio_device(slave)[0]): - load_fpga_image('AA') - -def run_aurora_bist(master, slave=None): - """ - Spawn a BER test - """ - from usrp_mpm import aurora_control - from usrp_mpm.sys_utils.uio import open_uio - - class DummyContext(object): - """Dummy class for context managers""" - def __enter__(self): - return - - def __exit__(self, exc_type, exc_value, traceback): - return exc_type is None - - # Go, go, go! - try: - assert_aurora_image(master, slave) - with open_uio(label=master, read_only=False) as master_au_uio: - master_au_ctrl = aurora_control.AuroraControl(master_au_uio) - with open_uio(label=slave, read_only=False)\ - if slave is not None else DummyContext() as slave_au_uio: - slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\ - if slave is not None else None - return master_au_ctrl.run_ber_loopback_bist( - duration=10, - requested_rate=1300 * 8e6, - slave=slave_au_ctrl, - ) - except Exception as ex: - print("Unexpected exception: {}".format(str(ex))) - exit(1) - - -def aurora_results_to_status(bist_results): - """ - Convert a dictionary coming from AuroraControl BIST to one that we can use - for this BIST - """ - return bist_results['mst_errors'] == 0, { - 'elapsed_time': bist_results['time_elapsed'], - 'max_roundtrip_latency': bist_results['mst_latency_us'], - 'throughput': bist_results['approx_throughput'], - 'max_ber': bist_results['max_ber'], - 'errors': bist_results['mst_errors'], - 'bits': bist_results['mst_samps'], - } ############################################################################## -# Helpers +# Bist class ############################################################################## -def post_results(results): - """ - Given a dictionary, post the results. - - This will print the results as JSON to stdout. - """ - print(json.dumps( - results, - sort_keys=True, - indent=4, - separators=(',', ': ') - )) - -def filter_results_for_lv(results): +class E320BIST(bist.UsrpBIST): """ - The LabView JSON parser does not support a variety of things, such as - nested dicts, and some downstream LV applications freak out if certain keys - are not what they expect. - This is a long hard-coded list of how results should look like for those - cases. Note: This list needs manual supervision and attention for the case - where either subsystems get renamed, or other architectural changes should - occur. + BIST Tool for the USRP E320 """ + usrp_type = "E320" + # This defines special tests that are really collections of other tests. + collections = { + 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro", + "ref_clock_int"], + 'extended': "*", + } + # Default FPGA image type + DEFAULT_FPGA_TYPE = '1G' lv_compat_format = { 'ddr3': { 'throughput': -1, @@ -172,250 +80,35 @@ def filter_results_for_lv(results): 'read_patterns': [], }, 'temp': { - temp_sensor_map['thermal_zone0']: -1, - temp_sensor_map['thermal_zone1']: -1, - temp_sensor_map['thermal_zone2']: -1, - temp_sensor_map['thermal_zone3']: -1, - temp_sensor_map['thermal_zone4']: -1, + TEMP_SENSOR_MAP['thermal_zone0']: -1, + TEMP_SENSOR_MAP['thermal_zone1']: -1, + TEMP_SENSOR_MAP['thermal_zone2']: -1, + TEMP_SENSOR_MAP['thermal_zone3']: -1, + TEMP_SENSOR_MAP['thermal_zone4']: -1, }, 'fan': { 'cooling_device0': -1, }, } - # OK now go and brush up the results: - def fixup_dict(result_dict, ref_dict): - """ - Touches up result_dict according to ref_dict by the following rules: - - If a key is in result_dict that is not in ref_dict, delete that - - If a key is in ref_dict that is not in result_dict, use the value - from ref_dict - """ - ref_dict['error_msg'] = "" - ref_dict['status'] = False - result_dict = { - k: v for k, v in iteritems(result_dict) - if k in ref_dict or k in ('error_msg', 'status') - } - result_dict = { - k: result_dict.get(k, ref_dict[k]) for k in ref_dict - } - return result_dict - results = { - testname: fixup_dict(testresults, lv_compat_format[testname]) \ - if testname in lv_compat_format else testresults - for testname, testresults in iteritems(results) - } - return results - -def sock_read_line(my_sock, timeout=60, interval=0.1): - """ - Read from a socket until newline. If there was no newline until the timeout - occurs, raise an error. Otherwise, return the line. - """ - line = b'' - end_time = time.time() + timeout - while time.time() < end_time: - socket_ready = select.select([my_sock], [], [], 0)[0] - if socket_ready: - next_char = my_sock.recv(1) - if next_char == b'\n': - return line.decode('ascii') - line += next_char - else: - time.sleep(interval) - raise RuntimeError("sock_read_line() exceeded read timeout!") - -def poll_with_timeout(state_check, timeout_ms, interval_ms): - """ - Calls state_check() every interval_ms until it returns a positive value, or - until a timeout is exceeded. - - Returns True if state_check() returned True within the timeout. - """ - max_time = time.time() + (float(timeout_ms) / 1000) - interval_s = float(interval_ms) / 1000 - while time.time() < max_time: - if state_check(): - return True - time.sleep(interval_s) - return False - -def expand_options(option_list): - """ - Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar', - 'spam': 'eggs'}. - """ - return dict(x.split('=') for x in option_list) + device_args = "type=e3xx,addr=127.0.0.1" -############################################################################## -# Bist class -############################################################################## -class E320BIST(object): - """ - BIST Tool for the USRP E320 - """ - # This defines special tests that are really collections of other tests. - collections = { - 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro", "ref_clock_int"], - 'extended': "*", - } - # Default FPGA image type - DEFAULT_FPGA_TYPE = '1G' + def __init__(self): + super(E320BIST, self).__init__() - @staticmethod - def make_arg_parser(): - """ - Return arg parser - """ - parser = argparse.ArgumentParser( - description="E320 BIST Tool", - ) - parser.add_argument( - '-n', '--dry-run', action='store_true', - help="Fake out the tests. All tests will return a valid" \ - " response, but will not actually interact with hardware.", - ) - parser.add_argument( - '-v', '--verbose', action='store_true', - help="Crank up verbosity level", - ) - parser.add_argument( - '--debug', action='store_true', - help="For debugging this tool.", - ) - parser.add_argument( - '--option', '-o', action='append', default=[], - help="Option for individual test.", - ) - parser.add_argument( - '--lv-compat', action='store_true', - help="Provides compatibility with the LV JSON parser. Don't run " - "this mode unless you know what you're doing. The JSON " - "output does not necessarily reflect the actual system " - "status when using this mode.", - ) - parser.add_argument( - '--skip-fpga-reload', action='store_true', - help="Skip reloading the default FPGA image post-test. Note: by" - "specifying this argument, the FPGA image loaded could be " - "anything post-test.", - ) - parser.add_argument( - 'tests', - help="List the tests that should be run", - nargs='+', # There has to be at least one - ) - return parser + def get_mb_periph_mgr(self): + """Return reference to an e320 periph manager""" + from usrp_mpm.periph_manager.e320 import e320 + return e320 - def __init__(self): - self.args = E320BIST.make_arg_parser().parse_args() - self.args.option = expand_options(self.args.option) - # If this is true, trigger a reload of the default FPGA image - self.reload_fpga_image = False - try: - from usrp_mpm.periph_manager.e320 import e320 - default_rev = e320.mboard_max_rev - except ImportError: - # This means we're in dry run mode or something like that, so just - # pick something - default_rev = 3 - self.mb_rev = int(self.args.option.get('mb_rev', default_rev)) - self.tests_to_run = set() - for test in self.args.tests: - if test in self.collections: - for test in self.expand_collection(test): - self.tests_to_run.add(test) - else: - self.tests_to_run.add(test) - try: - # Keep this import here so we can do dry-runs without any MPM code - from usrp_mpm import get_main_logger - if not self.args.verbose: - from usrp_mpm.mpmlog import WARNING - get_main_logger().setLevel(WARNING) - self.log = get_main_logger().getChild('main') - except ImportError: - print("No logging capability available.") - - def expand_collection(self, coll): - """ - Return names of tests in a collection - """ - tests = self.collections[coll] - if tests == "*": - tests = {x.replace('bist_', '') - for x in dir(self) - if x.find('bist_') == 0 - } - else: - tests = set(tests) - return tests - - def run(self): - """ - Execute tests. + def get_product_id(self): + """Return the mboard product ID (e320):""" + return bist.get_product_id_from_eeprom(valid_ids=['e320']) - Returns True on Success. - """ - def execute_test(testname): - """ - Actually run a test. - """ - testmethod_name = "bist_{0}".format(testname) - sys.stderr.write( - "Executing test method: {0}\n\n".format(testmethod_name) - ) - try: - status, data = getattr(self, testmethod_name)() - data['status'] = status - data['error_msg'] = data.get('error_msg', '') - return status, data - except AttributeError: - sys.stderr.write("Test not defined: {}\n".format(testname)) - return False, {} - except Exception as ex: - sys.stderr.write( - "Test {} failed to execute: {}\n".format(testname, str(ex)) - ) - if self.args.debug: - raise - return False, {'error_msg': str(ex)} - tests_successful = True - result = {} - for test in self.tests_to_run: - status, result_data = execute_test(test) - tests_successful = tests_successful and status - result[test] = result_data - if self.args.lv_compat: - result = filter_results_for_lv(result) - post_results(result) - if self.reload_fpga_image and not self.args.skip_fpga_reload: - load_fpga_image(self.DEFAULT_FPGA_TYPE) - return tests_successful ############################################################################# # BISTS # All bist_* methods must return True/False success values! ############################################################################# - def bist_rtc(self): - """ - BIST for RTC (real time clock) - - Return dictionary: - - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601 - format, as a string. As if running 'date -Iseconds -u'. - - time: Same time, but in seconds since epoch. - - Return status: - Unless datetime throws an exception, returns True. - """ - assert 'rtc' in self.tests_to_run - utc_now = datetime.utcnow() - return True, { - 'time': time.mktime(utc_now.timetuple()), - 'date': utc_now.replace(microsecond=0).isoformat() + "+00:00", - } - def bist_gyro(self): """ BIST for GYRO (MPU9250) @@ -458,25 +151,7 @@ class E320BIST(object): assert 'ddr3' in self.tests_to_run if self.args.dry_run: return True, {'throughput': 1250e6} - result = {} - ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1' - try: - output = subprocess.check_output( - ddr3_bist_executor, - stderr=subprocess.STDOUT, - shell=True, - ) - except subprocess.CalledProcessError as ex: - # Don't throw errors from uhd_usrp_probe - output = ex.output - output = output.decode("utf-8") - mobj = re.search(r"Throughput: (?P[0-9.]+)\s?MB", output) - if mobj is not None: - result['throughput'] = float(mobj.group('thrup')) * 1000 - else: - result['throughput'] = 0 - result['error_msg'] = result.get('error_msg', '') + \ - "\n\nFailed match throughput regex!" + result = bist.test_ddr3_with_usrp_probe() return result.get('throughput', 0) > 1000e3, result def bist_gpsdo(self): @@ -522,7 +197,7 @@ class E320BIST(object): sys.stderr.write( "Waiting for WARMUP to go low for up to {} seconds...\n".format( gps_warmup_timeout)) - if not poll_with_timeout( + if not bist.poll_with_timeout( lambda: not bool((mb_regs.get_gps_status() >> 4) & 0x1), gps_warmup_timeout*1000, 1000 ): @@ -534,7 +209,7 @@ class E320BIST(object): sys.stderr.write( "Waiting for LOCKOK to go high for up to {} seconds...\n".format( gps_lockok_timeout)) - if not poll_with_timeout( + if not bist.poll_with_timeout( mb_regs.get_gps_locked_val, gps_lockok_timeout*1000, 1000 @@ -550,24 +225,8 @@ class E320BIST(object): sys.stderr.write("GPS-ALARM status: {}\n".format( (gps_status >> 1) & 0x1 )) - # Now read back response from chip - my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - my_sock.connect(('localhost', 2947)) - sys.stderr.write("Connected to GPSDO socket.\n") - query_cmd = b'?WATCH={"enable":true,"json":true}' - my_sock.sendall(query_cmd) - sys.stderr.write("Sent query: {}\n".format(query_cmd)) - sock_read_line(my_sock, timeout=10) - sys.stderr.write("Received initial newline.\n") - result = {} - while result.get('class', None) != 'TPV': - json_result = sock_read_line(my_sock, timeout=60) - sys.stderr.write( - "Received JSON response: {}\n\n".format(json_result) - ) - result = json.loads(json_result) - my_sock.sendall(b'?WATCH={"enable":false}') - my_sock.close() + # Now the chip is on, read back the TPV result + result = bist.get_gpsd_tpv_result() # If we reach this line, we have a valid result and the chip responded. # However, it doesn't necessarily mean we had a GPS lock. return True, result @@ -589,60 +248,9 @@ class E320BIST(object): return True, { 'tpm0_caps': "Fake caps value\n\nVersion 0.0.0", } - result = {} - props_to_read = ('caps',) - base_path = '/sys/class/tpm' - for tpm_device in os.listdir(base_path): - if tpm_device.startswith('tpm'): - for key in props_to_read: - result['{}_{}'.format(tpm_device, key)] = open( - os.path.join(base_path, tpm_device, key), 'r' - ).read().strip() + result = bist.get_tpm_caps_info() return len(result) == 1, result - def ref_clock_helper(self,clock_source): - """ - Helper function to determine reference clock lock - Description: Checks to see if we can lock to a clock source. - - External Equipment: None - Return dictionary: - - : - - locked: Boolean lock status - """ - assert clock_source in ("internal", "external"),\ - "Invalid clock source selected ({}). Valid choices: {}".format( - clock_source, ("internal", "external")) - if self.args.dry_run: - return True, {'ref_locked': True} - result = {} - env = os.environ.copy() - env['UHD_LOG_CONSOLE_LEVEL'] = 'error' - cmd = ['uhd_usrp_probe', '--args', 'addr=127.0.0.1,clock_source=' + clock_source, - '--sensor'] - sensor_path = '/mboards/0/sensors/ref_locked' - cmd.append(sensor_path) - ref_lock_executor = ' '.join(cmd) - try: - output = subprocess.check_output( - ref_lock_executor, - stderr=subprocess.STDOUT, - env=env, - shell=True, - ) - except subprocess.CalledProcessError as ex: - # Don't throw errors from uhd_usrp_probe - output = ex.output - output = output.decode("utf-8") - mobj = re.search(r"true$", output.strip()) - if mobj is not None: - result['ref_locked'] = True - else: - result['ref_locked'] = False - result['error_msg'] = ("Reference Clock not locked." - " Extra output:" + output) - return result - def bist_ref_clock_int(self): """ BIST for clock lock from internal (20MHz). @@ -658,7 +266,9 @@ class E320BIST(object): need to be asserted. """ assert 'ref_clock_int' in self.tests_to_run - result = self.ref_clock_helper('internal') + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop('internal', 'internal') return 'error_msg' not in result, result def bist_ref_clock_ext(self): @@ -676,7 +286,9 @@ class E320BIST(object): need to be asserted. """ assert 'ref_clock_ext' in self.tests_to_run - result = self.ref_clock_helper('external') + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop('external', 'external') return 'error_msg' not in result, result def bist_sfp_loopback(self): @@ -696,10 +308,14 @@ class E320BIST(object): - bits: Number of bits that were transferred """ if self.args.dry_run: - return True, get_sfp_bist_defaults() - sfp_bist_results = run_aurora_bist(master='misc-auro-regs') + return True, bist.get_sfp_bist_defaults() + sfp_bist_results = bist.run_aurora_bist( + device_args=self.device_args, + product_id=self.get_product_id(), + master='misc-auro-regs', + ) self.reload_fpga_image = True - return aurora_results_to_status(sfp_bist_results) + return bist.aurora_results_to_status(sfp_bist_results) def bist_gpio(self): """ @@ -737,7 +353,7 @@ class E320BIST(object): " Run a GPIO test for a given set of patterns " gpio_ctrl = e320_periphs.FrontpanelGPIO(ddr) for pattern in patterns: - gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr) + bist.gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr) time.sleep(0.1) gpio_rb = gpio_ctrl.get_all() if pattern != gpio_rb: @@ -763,16 +379,8 @@ class E320BIST(object): assert 'temp' in self.tests_to_run if self.args.dry_run: return True, {'internal': 30000} - import pyudev - context = pyudev.Context() - result = { - temp_sensor_map[device.sys_name]: \ - int(device.attributes.get('temp').decode('ascii')) - for device in context.list_devices(subsystem='thermal') - - if 'temp' in device.attributes.available_attributes \ - and device.attributes.get('temp') is not None - } + result = bist.get_temp_sensor_value( + lambda device: TEMP_SENSOR_MAP[device.sys_name]) if len(result) < 1: result['error_msg'] = "No temperature sensors found!" return 'error_msg' not in result, result @@ -790,14 +398,7 @@ class E320BIST(object): assert 'fan' in self.tests_to_run if self.args.dry_run: return True, {'cooling_device0': 10000} - import pyudev - context = pyudev.Context() - result = { - device.sys_name: int(device.attributes.get('cur_state')) - for device in context.list_devices(subsystem='thermal') - if 'cur_state' in device.attributes.available_attributes \ - and device.attributes.get('cur_state') is not None - } + result = bist.get_fan_values() return len(result) == 1, result def bist_link_up(self): @@ -812,65 +413,9 @@ class E320BIST(object): assert 'link_up' in self.tests_to_run if self.args.dry_run: return True, {'sfp0': 'UP'} - from pyroute2 import IPRoute - result = {} - with IPRoute() as ipr: - links = ipr.link_lookup(ifname='sfp0') - if not links: - return False, {'error_msg': "No interface found"} - link_info = next(iter(ipr.get_links(links)), None) - if link_info == None: - return False, {'error_msg': "Error on get_links for sfp0"} - result['sfp0'] = link_info.get_attr('IFLA_OPERSTATE') - if result['sfp0'] != 'UP': - result['error_msg'] = "Link not up for interface" + result = bist.get_link_up('sfp0') return 'error_msg' not in result, result - -def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask): - """Helper function for set gpio. - What this function do is take decimal value and convert to a binary string - then try to set those individual bits to the gpio_bank. - Arguments: - gpio_bank -- gpio bank type. - value -- value to set onto gpio bank. - gpio_size -- size of the gpio bank - ddr_mask -- data direction register bit mask. 0 is input; 1 is output. - """ - ddr_size = bin(ddr_mask).count("1") - value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):] - ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):] - for i in range(gpio_size): - if ddr_bitstring[gpio_size - 1 - i] == "1": - gpio_bank.set(i, value_bitstring[i % ddr_size]) - -def get_product_id(): - """Return the mboard product ID (e320):""" - cmd = ['eeprom-id'] - output = subprocess.check_output( - cmd, - stderr=subprocess.STDOUT, - shell=True, - ).decode('utf-8') - if 'e320' in output: - return 'e320' - raise AssertionError("Cannot determine product ID.") - -def load_fpga_image(fpga_type): - """Load an FPGA image (1G, XG, AA, ...)""" - cmd = ['uhd_image_loader', '--args', 'type=e3xx,addr=127.0.0.1', '--fpga-path'] - images_folder = '/usr/share/uhd/images/' - fpga_file_name = \ - 'usrp_' + get_product_id() + '_fpga_' + fpga_type.upper() + '.bit' - fpga_image = images_folder + fpga_file_name - cmd.append(fpga_image) - cmd_str = ' '.join(cmd) - subprocess.check_output( - cmd_str, - stderr=subprocess.STDOUT, - shell=True - ) - ############################################################################## # main ############################################################################## diff --git a/mpm/python/n3xx_bist b/mpm/python/n3xx_bist index 18f1c3b4b..cc7dd40c6 100755 --- a/mpm/python/n3xx_bist +++ b/mpm/python/n3xx_bist @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2017 Ettus Research, National Instruments Company +# Copyright 2017-2018 Ettus Research, National Instruments Company # # SPDX-License-Identifier: GPL-3.0-or-later # @@ -11,17 +11,9 @@ Will work on all derivatives of the N3xx series. """ from __future__ import print_function -import os import sys -import subprocess -import re -import socket -import select import time -import json -from datetime import datetime -import argparse -from six import iteritems +from usrp_mpm import bist # Timeout values are in seconds: GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute" @@ -31,105 +23,20 @@ GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test # by default. ############################################################################## -# Aurora/SFP BIST code +# Bist class ############################################################################## -def get_sfp_bist_defaults(): - " Default dictionary for SFP/Aurora BIST dry-runs " - return { - 'elapsed_time': 1.0, - 'max_roundtrip_latency': 0.8e-6, - 'throughput': 1000e6, - 'max_ber': 8.5e-11, - 'errors': 0, - 'bits': 12012486656, - } - -def assert_aurora_image(master, slave): - """ - Make sure we have an FPGA image with which we can run the requested tests. - - Will load an AA image if not, which always satisfies all conditions for - running Aurora tests. - """ - from usrp_mpm.sys_utils import uio - if not uio.find_uio_device(master)[0] or \ - (slave is not None and not uio.find_uio_device(slave)[0]): - load_fpga_image('AA') - -def run_aurora_bist(master, slave=None): +class N3XXBIST(bist.UsrpBIST): """ - Spawn a BER test - """ - from usrp_mpm import aurora_control - from usrp_mpm.sys_utils.uio import open_uio - - class DummyContext(object): - """Dummy class for context managers""" - def __enter__(self): - return - - def __exit__(self, exc_type, exc_value, traceback): - return exc_type is None - - # Go, go, go! - try: - assert_aurora_image(master, slave) - with open_uio(label=master, read_only=False) as master_au_uio: - master_au_ctrl = aurora_control.AuroraControl(master_au_uio) - with open_uio(label=slave, read_only=False)\ - if slave is not None else DummyContext() as slave_au_uio: - slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\ - if slave is not None else None - return master_au_ctrl.run_ber_loopback_bist( - duration=10, - requested_rate=1300 * 8e6, - slave=slave_au_ctrl, - ) - except Exception as ex: - print("Unexpected exception: {}".format(str(ex))) - exit(1) - - -def aurora_results_to_status(bist_results): - """ - Convert a dictionary coming from AuroraControl BIST to one that we can use - for this BIST + BIST Tool for the USRP N3xx series """ - return bist_results['mst_errors'] == 0, { - 'elapsed_time': bist_results['time_elapsed'], - 'max_roundtrip_latency': bist_results['mst_latency_us'], - 'throughput': bist_results['approx_throughput'], - 'max_ber': bist_results['max_ber'], - 'errors': bist_results['mst_errors'], - 'bits': bist_results['mst_samps'], + usrp_type = "N3XX" + # This defines special tests that are really collections of other tests. + collections = { + 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm"], + 'extended': "*", } - -############################################################################## -# Helpers -############################################################################## -def post_results(results): - """ - Given a dictionary, post the results. - - This will print the results as JSON to stdout. - """ - print(json.dumps( - results, - sort_keys=True, - indent=4, - separators=(',', ': ') - )) - -def filter_results_for_lv(results): - """ - The LabView JSON parser does not support a variety of things, such as - nested dicts, and some downstream LV applications freak out if certain keys - are not what they expect. - This is a long hard-coded list of how results should look like for those - cases. Note: This list needs manual supervision and attention for the case - where either subsystems get renamed, or other architectural changes should - occur. - """ + # Default FPGA image type + DEFAULT_FPGA_TYPE = 'HG' lv_compat_format = { 'ddr3': { 'throughput': -1, @@ -184,240 +91,24 @@ def filter_results_for_lv(results): 'lock_status': 0, }, } - # OK now go and brush up the results: - def fixup_dict(result_dict, ref_dict): - """ - Touches up result_dict according to ref_dict by the following rules: - - If a key is in result_dict that is not in ref_dict, delete that - - If a key is in ref_dict that is not in result_dict, use the value - from ref_dict - """ - ref_dict['error_msg'] = "" - ref_dict['status'] = False - result_dict = { - k: v for k, v in iteritems(result_dict) - if k in ref_dict or k in ('error_msg', 'status') - } - result_dict = { - k: result_dict.get(k, ref_dict[k]) for k in ref_dict - } - return result_dict - results = { - testname: fixup_dict(testresults, lv_compat_format[testname]) \ - if testname in lv_compat_format else testresults - for testname, testresults in iteritems(results) - } - return results - -def sock_read_line(my_sock, timeout=60, interval=0.1): - """ - Read from a socket until newline. If there was no newline until the timeout - occurs, raise an error. Otherwise, return the line. - """ - line = b'' - end_time = time.time() + timeout - while time.time() < end_time: - socket_ready = select.select([my_sock], [], [], 0)[0] - if socket_ready: - next_char = my_sock.recv(1) - if next_char == b'\n': - return line.decode('ascii') - line += next_char - else: - time.sleep(interval) - raise RuntimeError("sock_read_line() exceeded read timeout!") - -def poll_with_timeout(state_check, timeout_ms, interval_ms): - """ - Calls state_check() every interval_ms until it returns a positive value, or - until a timeout is exceeded. - - Returns True if state_check() returned True within the timeout. - """ - max_time = time.time() + (float(timeout_ms) / 1000) - interval_s = float(interval_ms) / 1000 - while time.time() < max_time: - if state_check(): - return True - time.sleep(interval_s) - return False - -def expand_options(option_list): - """ - Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar', - 'spam': 'eggs'}. - """ - return dict(x.split('=') for x in option_list) - -############################################################################## -# Bist class -############################################################################## -class N3XXBIST(object): - """ - BIST Tool for the USRP N3xx series - """ - # This defines special tests that are really collections of other tests. - collections = { - 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm"], - 'extended': "*", - } - # Default FPGA image type - DEFAULT_FPGA_TYPE = 'HG' - - @staticmethod - def make_arg_parser(): - """ - Return arg parser - """ - parser = argparse.ArgumentParser( - description="N3xx BIST Tool", - ) - parser.add_argument( - '-n', '--dry-run', action='store_true', - help="Fake out the tests. All tests will return a valid" \ - " response, but will not actually interact with hardware.", - ) - parser.add_argument( - '-v', '--verbose', action='store_true', - help="Crank up verbosity level", - ) - parser.add_argument( - '--debug', action='store_true', - help="For debugging this tool.", - ) - parser.add_argument( - '--option', '-o', action='append', default=[], - help="Option for individual test.", - ) - parser.add_argument( - '--lv-compat', action='store_true', - help="Provides compatibility with the LV JSON parser. Don't run " - "this mode unless you know what you're doing. The JSON " - "output does not necessarily reflect the actual system " - "status when using this mode.", - ) - parser.add_argument( - '--skip-fpga-reload', action='store_true', - help="Skip reloading the default FPGA image post-test. Note: by" - "specifying this argument, the FPGA image loaded could be " - "anything post-test.", - ) - parser.add_argument( - 'tests', - help="List the tests that should be run", - nargs='+', # There has to be at least one - ) - return parser + device_args = "type=n3xx,addr=127.0.0.1" def __init__(self): - self.args = N3XXBIST.make_arg_parser().parse_args() - self.args.option = expand_options(self.args.option) - # If this is true, trigger a reload of the default FPGA image - self.reload_fpga_image = False - try: - from usrp_mpm.periph_manager.n3xx import n3xx - default_rev = n3xx.mboard_max_rev - except ImportError: - # This means we're in dry run mode or something like that, so just - # pick something - default_rev = 3 - self.mb_rev = int(self.args.option.get('mb_rev', default_rev)) - self.tests_to_run = set() - for test in self.args.tests: - if test in self.collections: - for test in self.expand_collection(test): - self.tests_to_run.add(test) - else: - self.tests_to_run.add(test) - try: - # Keep this import here so we can do dry-runs without any MPM code - from usrp_mpm import get_main_logger - if not self.args.verbose: - from usrp_mpm.mpmlog import WARNING - get_main_logger().setLevel(WARNING) - self.log = get_main_logger().getChild('main') - except ImportError: - print("No logging capability available.") - - def expand_collection(self, coll): - """ - Return names of tests in a collection - """ - tests = self.collections[coll] - if tests == "*": - tests = {x.replace('bist_', '') - for x in dir(self) - if x.find('bist_') == 0 - } - else: - tests = set(tests) - return tests - - def run(self): - """ - Execute tests. + bist.UsrpBIST.__init__(self) - Returns True on Success. - """ - def execute_test(testname): - """ - Actually run a test. - """ - testmethod_name = "bist_{0}".format(testname) - sys.stderr.write( - "Executing test method: {0}\n\n".format(testmethod_name) - ) - try: - status, data = getattr(self, testmethod_name)() - data['status'] = status - data['error_msg'] = data.get('error_msg', '') - return status, data - except AttributeError: - sys.stderr.write("Test not defined: {}\n".format(testname)) - return False, {} - except Exception as ex: - sys.stderr.write( - "Test {} failed to execute: {}\n".format(testname, str(ex)) - ) - if self.args.debug: - raise - return False, {'error_msg': str(ex)} - tests_successful = True - result = {} - for test in self.tests_to_run: - status, result_data = execute_test(test) - tests_successful = tests_successful and status - result[test] = result_data - if self.args.lv_compat: - result = filter_results_for_lv(result) - post_results(result) - if self.reload_fpga_image and not self.args.skip_fpga_reload: - load_fpga_image(self.DEFAULT_FPGA_TYPE) - return tests_successful + def get_mb_periph_mgr(self): + """Return reference to an n3xx periph manager""" + from usrp_mpm.periph_manager.n3xx import n3xx + return n3xx + + def get_product_id(self): + """Return the mboard product ID (n310 or n300):""" + return bist.get_product_id_from_eeprom(valid_ids=['n300', 'n310']) ############################################################################# # BISTS # All bist_* methods must return True/False success values! ############################################################################# - def bist_rtc(self): - """ - BIST for RTC (real time clock) - - Return dictionary: - - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601 - format, as a string. As if running 'date -Iseconds -u'. - - time: Same time, but in seconds since epoch. - - Return status: - Unless datetime throws an exception, returns True. - """ - assert 'rtc' in self.tests_to_run - utc_now = datetime.utcnow() - return True, { - 'time': time.mktime(utc_now.timetuple()), - 'date': utc_now.replace(microsecond=0).isoformat() + "+00:00", - } - def bist_ddr3(self): """ BIST for PL DDR3 DRAM @@ -437,25 +128,7 @@ class N3XXBIST(object): assert 'ddr3' in self.tests_to_run if self.args.dry_run: return True, {'throughput': 1250e6} - result = {} - ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1' - try: - output = subprocess.check_output( - ddr3_bist_executor, - stderr=subprocess.STDOUT, - shell=True, - ) - except subprocess.CalledProcessError as ex: - # Don't throw errors from uhd_usrp_probe - output = ex.output - output = output.decode("utf-8") - mobj = re.search(r"Throughput: (?P[0-9.]+)\s?MB", output) - if mobj is not None: - result['throughput'] = float(mobj.group('thrup')) * 1000 - else: - result['throughput'] = 0 - result['error_msg'] = result.get('error_msg', '') + \ - "\n\nFailed match throughput regex!" + result = bist.test_ddr3_with_usrp_probe() return result.get('throughput', 0) > 1000e3, result def bist_gpsdo(self): @@ -501,7 +174,7 @@ class N3XXBIST(object): sys.stderr.write( "Waiting for WARMUP to go low for up to {} seconds...\n".format( gps_warmup_timeout)) - if not poll_with_timeout( + if not bist.poll_with_timeout( lambda: not gpio_tca6424.get('GPS-WARMUP'), gps_warmup_timeout*1000, 1000 ): @@ -513,7 +186,7 @@ class N3XXBIST(object): sys.stderr.write( "Waiting for LOCKOK to go high for up to {} seconds...\n".format( gps_lockok_timeout)) - if not poll_with_timeout( + if not bist.poll_with_timeout( lambda: gpio_tca6424.get('GPS-LOCKOK'), gps_lockok_timeout*1000, 1000 @@ -528,24 +201,8 @@ class N3XXBIST(object): sys.stderr.write("GPS-ALARM status: {}\n".format( gpio_tca6424.get('GPS-ALARM') )) - # Now read back response from chip - my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - my_sock.connect(('localhost', 2947)) - sys.stderr.write("Connected to GPSDO socket.\n") - query_cmd = b'?WATCH={"enable":true,"json":true}' - my_sock.sendall(query_cmd) - sys.stderr.write("Sent query: {}\n".format(query_cmd)) - sock_read_line(my_sock, timeout=10) - sys.stderr.write("Received initial newline.\n") - result = {} - while result.get('class', None) != 'TPV': - json_result = sock_read_line(my_sock, timeout=60) - sys.stderr.write( - "Received JSON response: {}\n\n".format(json_result) - ) - result = json.loads(json_result) - my_sock.sendall(b'?WATCH={"enable":false}') - my_sock.close() + # Now the chip is on, read back the TPV result + result = bist.get_gpsd_tpv_result() # If we reach this line, we have a valid result and the chip responded. # However, it doesn't necessarily mean we had a GPS lock. return True, result @@ -567,17 +224,87 @@ class N3XXBIST(object): return True, { 'tpm0_caps': "Fake caps value\n\nVersion 0.0.0", } - result = {} - props_to_read = ('caps',) - base_path = '/sys/class/tpm' - for tpm_device in os.listdir(base_path): - if tpm_device.startswith('tpm'): - for key in props_to_read: - result['{}_{}'.format(tpm_device, key)] = open( - os.path.join(base_path, tpm_device, key), 'r' - ).read().strip() + result = bist.get_tpm_caps_info() return len(result) == 1, result + def bist_ref_clock_int(self): + """ + BIST for clock lock from internal (25 MHz) source. + Description: Checks to see if the daughtercard can lock to an internal + clock source. + + External Equipment: None + Return dictionary: + - : + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_int' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop( + 'internal', + 'internal', + extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0} + ) + return 'error_msg' not in result, result + + def bist_ref_clock_ext(self): + """ + BIST for clock lock from external source. Note: This test requires a + connected daughterboard with a 'ref lock' sensor available. + + Description: Checks to see if the daughtercard can lock to the external + reference clock. + + External Equipment: 10 MHz reference Source connected to "ref in". + + Return dictionary: + - : + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_ext' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop( + 'external', + 'external', + extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0} + ) + return 'error_msg' not in result, result + + def bist_ref_clock_gpsdo(self): + """ + BIST for clock lock from external source. Note: This test requires a + connected daughterboard with a 'ref lock' sensor available. + + Description: Checks to see if the daughtercard can lock to the external + reference clock. + + External Equipment: 10 MHz reference Source connected to "ref in". + + Return dictionary: + - : + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_gpsdo' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop( + 'gpsdo', + 'gpsdo', + extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0} + ) + return 'error_msg' not in result, result + def bist_sfp0_loopback(self): """ BIST for SFP+ ports: @@ -596,10 +323,14 @@ class N3XXBIST(object): - bits: Number of bits that were transferred """ if self.args.dry_run: - return True, get_sfp_bist_defaults() - sfp_bist_results = run_aurora_bist(master='misc-auro-regs0') + return True, bist.get_sfp_bist_defaults() + sfp_bist_results = bist.run_aurora_bist( + device_args=self.device_args, + product_id=self.get_product_id(), + master='misc-auro-regs0', + ) self.reload_fpga_image = True - return aurora_results_to_status(sfp_bist_results) + return bist.aurora_results_to_status(sfp_bist_results) def bist_sfp1_loopback(self): """ @@ -619,10 +350,14 @@ class N3XXBIST(object): - bits: Number of bits that were transferred """ if self.args.dry_run: - return True, get_sfp_bist_defaults() - sfp_bist_results = run_aurora_bist(master='misc-auro-regs1') + return True, bist.get_sfp_bist_defaults() + sfp_bist_results = bist.run_aurora_bist( + device_args=self.device_args, + product_id=self.get_product_id(), + master='misc-auro-regs1', + ) self.reload_fpga_image = True - return aurora_results_to_status(sfp_bist_results) + return bist.aurora_results_to_status(sfp_bist_results) def bist_sfp_loopback(self): """ @@ -642,13 +377,15 @@ class N3XXBIST(object): - bits: Number of bits that were transferred """ if self.args.dry_run: - return True, get_sfp_bist_defaults() - sfp_bist_results = run_aurora_bist( + return True, bist.get_sfp_bist_defaults() + sfp_bist_results = bist.run_aurora_bist( + device_args=self.device_args, + product_id=self.get_product_id(), master='misc-auro-regs0', slave='misc-auro-regs1', ) self.reload_fpga_image = True - return aurora_results_to_status(sfp_bist_results) + return bist.aurora_results_to_status(sfp_bist_results) def bist_gpio(self): """ @@ -690,7 +427,7 @@ class N3XXBIST(object): " Run a GPIO test for a given set of patterns " gpio_ctrl = n3xx_periphs.FrontpanelGPIO(ddr) for pattern in patterns: - gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr) + bist.gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr) time.sleep(0.1) gpio_rb = gpio_ctrl.get_all() if pattern != gpio_rb: @@ -716,15 +453,8 @@ class N3XXBIST(object): assert 'temp' in self.tests_to_run if self.args.dry_run: return True, {'fpga-thermal-zone': 30000} - import pyudev - context = pyudev.Context() - result = { - device.attributes.get('type').decode('ascii'): \ - int(device.attributes.get('temp').decode('ascii')) - for device in context.list_devices(subsystem='thermal') - if 'temp' in device.attributes.available_attributes \ - and device.attributes.get('temp') is not None - } + result = bist.get_temp_sensor_value( + lambda device: device.attributes.get('type').decode('ascii')) if len(result) < 1: result['error_msg'] = "No temperature sensors found!" return 'error_msg' not in result, result @@ -742,14 +472,7 @@ class N3XXBIST(object): assert 'fan' in self.tests_to_run if self.args.dry_run: return True, {'cooling_device0': 10000, 'cooling_device1': 10000} - import pyudev - context = pyudev.Context() - result = { - device.sys_name: int(device.attributes.get('cur_state')) - for device in context.list_devices(subsystem='thermal') - if 'cur_state' in device.attributes.available_attributes \ - and device.attributes.get('cur_state') is not None - } + result = bist.get_fan_values() return len(result) == 2, result def bist_whiterabbit(self): @@ -768,7 +491,11 @@ class N3XXBIST(object): from usrp_mpm.sys_utils import uio if not uio.find_uio_device(n3xx.wr_regs_label, logger=self.log)[0]: self.log.info("Need to load WX image before proceeding...") - load_fpga_image('WX') + bist.load_fpga_image( + 'WX', + self.device_args, + self.get_product_id(), + ) self.log.info("Image loading complete.") self.reload_fpga_image = True mb_regs = n3xx_periphs.MboardRegsControl( @@ -776,7 +503,7 @@ class N3XXBIST(object): mb_regs.set_time_source('sfp0', 25e6) wr_regs_control = WhiteRabbitRegsControl( n3xx.wr_regs_label, self.log) - lock_status = poll_with_timeout( + lock_status = bist.poll_with_timeout( lambda: wr_regs_control.get_time_lock_status(), 40000, # Try for x ms... this number is set from a few benchtop tests 1000, # Poll every... second! why not? @@ -786,160 +513,6 @@ class N3XXBIST(object): } return lock_status, result - def bist_ref_clock_int(self): - """ - BIST for clock and pps lock from internal (25MHz). - Description: Checks to see if we can lock to an internal - clock source. - - External Equipment: None - Return dictionary: - - : - - locked: Boolean lock status - - There can be multiple ref lock sensors; for a pass condition they all - need to be asserted. - """ - assert 'ref_clock_int' in self.tests_to_run - result = self.ref_clock_helper('internal', 'internal') - return 'error_msg' not in result, result - - def bist_ref_clock_ext(self): - """ - BIST for clock pps lock from external (10MHz) source. - Description: Checks to see if we can lock to an external - clock source. - - External Equipment: External 10 MHz reference clock needed (from Octoclock) - Return dictionary: - - : - - locked: Boolean lock status - - There can be multiple ref lock sensors; for a pass condition they all - need to be asserted. - """ - assert 'ref_clock_ext' in self.tests_to_run - result = self.ref_clock_helper('external', 'external') - return 'error_msg' not in result, result - - def bist_ref_clock_gpsdo(self): - """ - BIST for clock and pps lock from gpsdo (20MHz) source. - Description: Checks to see if we can lock to an external - clock source. - - External Equipment: None - Return dictionary: - - : - - locked: Boolean lock status - - There can be multiple ref lock sensors; for a pass condition they all - need to be asserted. - """ - assert 'ref_clock_gpsdo' in self.tests_to_run - result = self.ref_clock_helper('gpsdo', 'gpsdo') - return 'error_msg' not in result, result - - def ref_clock_helper(self, clock_source, time_source): - """ - Helper function to determine reference clock lock - Description: Checks to see if we can lock to a clock source. - External Equipment: None - Return dictionary: - - : - - locked: Boolean lock status - """ - assert clock_source in ("gpsdo", "internal", "external"),\ - "Invalid clock source selected ({}). Valid choices: {}".format( - clock_source, ("gpsdo", "internal", "external")) - - assert time_source in ("gpsdo", "internal", "external", "sfp0"),\ - "Invalid time source selected ({}). Valid choices: {}".format( - time_source, ("gpsdo", "internal", "external", "sfp0")) - - if self.args.dry_run: - return True, {'ref_locked': True} - result = {} - env = os.environ.copy() - env['UHD_LOG_CONSOLE_LEVEL'] = 'error' - cmd = ['uhd_usrp_probe', '--args', - 'addr=127.0.0.1,' - 'rfnoc_num_blocks=0,' - 'skip_rfic=1,' - 'clock_source={c},time_source={t}'.format(c=clock_source, - t=time_source), - '--sensor'] - sensor_path = '/mboards/0/sensors/ref_locked' - cmd.append(sensor_path) - ref_lock_executor = ' '.join(cmd) - try: - output = subprocess.check_output( - ref_lock_executor, - stderr=subprocess.STDOUT, - env=env, - shell=True, - ) - except subprocess.CalledProcessError as ex: - # Don't throw errors from uhd_usrp_probe - output = ex.output - output = output.decode("utf-8") - mobj = re.search(r"true$", output.strip()) - if mobj is not None: - result['ref_locked'] = True - else: - result['ref_locked'] = False - result['error_msg'] = ("Reference Clock not locked." - " Extra output:" + output) - return result - - - -def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask): - """Helper function for set gpio. - What this function do is take decimal value and convert to a binary string - then try to set those individual bits to the gpio_bank. - Arguments: - gpio_bank -- gpio bank type. - value -- value to set onto gpio bank. - gpio_size -- size of the gpio bank - ddr_mask -- data direction register bit mask. 0 is input; 1 is output. - """ - ddr_size = bin(ddr_mask).count("1") - value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):] - ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):] - for i in range(gpio_size): - if ddr_bitstring[gpio_size - 1 - i] == "1": - gpio_bank.set(i, value_bitstring[i % ddr_size]) - -def get_product_id(): - """Return the mboard product ID (n310 or n300):""" - cmd = ['eeprom-id'] - output = subprocess.check_output( - cmd, - stderr=subprocess.STDOUT, - shell=True, - ).decode('utf-8') - if 'n310' in output: - return 'n310' - elif 'n300' in output: - return 'n300' - raise AssertionError("Cannot determine product ID.") - -def load_fpga_image(fpga_type): - """Load an FPGA image (HG, XG, AA, ...)""" - cmd = ['uhd_image_loader', '--args', 'type=n3xx', '--fpga-path'] - images_folder = '/usr/share/uhd/images/' - fpga_file_name = \ - 'usrp_' + get_product_id() + '_fpga_' + fpga_type.upper() + '.bit' - fpga_image = images_folder + fpga_file_name - cmd.append(fpga_image) - cmd_str = ' '.join(cmd) - subprocess.check_output( - cmd_str, - stderr=subprocess.STDOUT, - shell=True - ) - ############################################################################## # main ############################################################################## diff --git a/mpm/python/usrp_mpm/bist.py b/mpm/python/usrp_mpm/bist.py new file mode 100644 index 000000000..b16fefa35 --- /dev/null +++ b/mpm/python/usrp_mpm/bist.py @@ -0,0 +1,598 @@ +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Utilities to write BIST executables for USRPs +""" + +import os +import re +import sys +import time +import json +import select +import socket +from datetime import datetime +import argparse +import subprocess +from six import iteritems + +############################################################################## +# Aurora/SFP BIST code +############################################################################## +def get_sfp_bist_defaults(): + " Default dictionary for SFP/Aurora BIST dry-runs " + return { + 'elapsed_time': 1.0, + 'max_roundtrip_latency': 0.8e-6, + 'throughput': 1000e6, + 'max_ber': 8.5e-11, + 'errors': 0, + 'bits': 12012486656, + } + +def assert_aurora_image(master, slave, device_args, product_id, aurora_image_type='AA'): + """ + Make sure we have an FPGA image with which we can run the requested tests. + + Will load an AA image if not, which always satisfies all conditions for + running Aurora tests. + """ + from usrp_mpm.sys_utils import uio + if not uio.find_uio_device(master)[0] or \ + (slave is not None and not uio.find_uio_device(slave)[0]): + load_fpga_image( + fpga_type=aurora_image_type, + device_args=device_args, + product_id=product_id, + ) + +def aurora_results_to_status(bist_results): + """ + Convert a dictionary coming from AuroraControl BIST to one that we can use + for this BIST + """ + return bist_results['mst_errors'] == 0, { + 'elapsed_time': bist_results['time_elapsed'], + 'max_roundtrip_latency': bist_results['mst_latency_us'], + 'throughput': bist_results['approx_throughput'], + 'max_ber': bist_results['max_ber'], + 'errors': bist_results['mst_errors'], + 'bits': bist_results['mst_samps'], + } + +def run_aurora_bist( + device_args, + product_id, + master, + slave=None, + requested_rate=1300*8e6, + aurora_image_type='AA'): + """ + Spawn a BER test + """ + from usrp_mpm import aurora_control + from usrp_mpm.sys_utils.uio import open_uio + + class DummyContext(object): + """Dummy class for context managers""" + def __enter__(self): + return + + def __exit__(self, exc_type, exc_value, traceback): + return exc_type is None + + # Go, go, go! + try: + assert_aurora_image(master, slave, device_args, product_id, aurora_image_type) + with open_uio(label=master, read_only=False) as master_au_uio: + master_au_ctrl = aurora_control.AuroraControl(master_au_uio) + with open_uio(label=slave, read_only=False)\ + if slave is not None else DummyContext() as slave_au_uio: + slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\ + if slave is not None else None + return master_au_ctrl.run_ber_loopback_bist( + duration=10, + requested_rate=requested_rate, + slave=slave_au_ctrl, + ) + except Exception as ex: + print("Unexpected exception: {}".format(str(ex))) + exit(1) + + +############################################################################## +# Helpers +############################################################################## +def post_results(results): + """ + Given a dictionary, post the results. + + This will print the results as JSON to stdout. + """ + print(json.dumps( + results, + sort_keys=True, + indent=4, + separators=(',', ': ') + )) + +def sock_read_line(my_sock, timeout=60, interval=0.1): + """ + Read from a socket until newline. If there was no newline until the timeout + occurs, raise an error. Otherwise, return the line. + """ + line = b'' + end_time = time.time() + timeout + while time.time() < end_time: + socket_ready = select.select([my_sock], [], [], 0)[0] + if socket_ready: + next_char = my_sock.recv(1) + if next_char == b'\n': + return line.decode('ascii') + line += next_char + else: + time.sleep(interval) + raise RuntimeError("sock_read_line() exceeded read timeout!") + +def poll_with_timeout(state_check, timeout_ms, interval_ms): + """ + Calls state_check() every interval_ms until it returns a positive value, or + until a timeout is exceeded. + + Returns True if state_check() returned True within the timeout. + """ + max_time = time.time() + (float(timeout_ms) / 1000) + interval_s = float(interval_ms) / 1000 + while time.time() < max_time: + if state_check(): + return True + time.sleep(interval_s) + return False + +def expand_options(option_list): + """ + Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar', + 'spam': 'eggs'}. + """ + return dict(x.split('=') for x in option_list) + +def load_fpga_image( + fpga_type, + device_args, + product_id, + images_folder='/usr/share/uhd/images/'): + """Load an FPGA image (1G, XG, AA, ...)""" + # cmd = ['uhd_image_loader', '--args', 'type=e3xx,addr=127.0.0.1', '--fpga-path'] + fpga_file_name = \ + 'usrp_' + product_id + '_fpga_' + fpga_type.upper() + '.bit' + fpga_image = images_folder + fpga_file_name + cmd = [ + 'uhd_image_loader', + '--args', device_args, + '--fpga-path', fpga_image + ] + cmd_str = ' '.join(cmd) + subprocess.check_output( + cmd_str, + stderr=subprocess.STDOUT, + shell=True + ) + +def filter_results_for_lv(results, lv_compat_format): + """ + The LabView JSON parser does not support a variety of things, such as + nested dicts, and some downstream LV applications freak out if certain keys + are not what they expect. + This is a long hard-coded list of how results should look like for those + cases. Note: This list needs manual supervision and attention for the case + where either subsystems get renamed, or other architectural changes should + occur. + """ + def fixup_dict(result_dict, ref_dict): + """ + Touches up result_dict according to ref_dict by the following rules: + - If a key is in result_dict that is not in ref_dict, delete that + - If a key is in ref_dict that is not in result_dict, use the value + from ref_dict + """ + ref_dict['error_msg'] = "" + ref_dict['status'] = False + result_dict = { + k: v for k, v in iteritems(result_dict) + if k in ref_dict or k in ('error_msg', 'status') + } + result_dict = { + k: result_dict.get(k, ref_dict[k]) for k in ref_dict + } + return result_dict + # GoGoGo + results = { + testname: fixup_dict(testresults, lv_compat_format[testname]) \ + if testname in lv_compat_format else testresults + for testname, testresults in iteritems(results) + } + return results + +def get_product_id_from_eeprom(valid_ids): + """Return the mboard product ID + + Returns something like n300, n310, e320... + """ + cmd = ['eeprom-id'] + output = subprocess.check_output( + cmd, + stderr=subprocess.STDOUT, + shell=True, + ).decode('utf-8') + for valid_id in valid_ids: + if valid_id in output: + return valid_id + raise AssertionError("Cannot determine product ID.: `{}'".format(output)) + +def get_tpm_caps_info(): + """Read 'caps' info from TPM subsystem""" + result = {} + props_to_read = ('caps',) + base_path = '/sys/class/tpm' + for tpm_device in os.listdir(base_path): + if tpm_device.startswith('tpm'): + for key in props_to_read: + result['{}_{}'.format(tpm_device, key)] = open( + os.path.join(base_path, tpm_device, key), 'r' + ).read().strip() + return result + +def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask): + """Helper function for set gpio. + What this function do is take decimal value and convert to a binary string + then try to set those individual bits to the gpio_bank. + Arguments: + gpio_bank -- gpio bank type. + value -- value to set onto gpio bank. + gpio_size -- size of the gpio bank + ddr_mask -- data direction register bit mask. 0 is input; 1 is output. + """ + ddr_size = bin(ddr_mask).count("1") + value_bitstring = \ + ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):] + ddr_bitstring = \ + ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):] + for i in range(gpio_size): + if ddr_bitstring[gpio_size - 1 - i] == "1": + gpio_bank.set(i, value_bitstring[i % ddr_size]) + +############################################################################## +# Common tests +############################################################################## +def test_ddr3_with_usrp_probe(): + """ + Run uhd_usrp_probe and scrape the output to see if the DRAM FIFO block is + reporting a good throughput. This is a bit of a roundabout way of testing + the DDR3, but it uses existing software and also tests the RFNoC pathways. + """ + result = {} + ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1' + try: + output = subprocess.check_output( + ddr3_bist_executor, + stderr=subprocess.STDOUT, + shell=True, + ) + except subprocess.CalledProcessError as ex: + # Don't throw errors from uhd_usrp_probe + output = ex.output + output = output.decode("utf-8") + mobj = re.search(r"Throughput: (?P[0-9.]+)\s?MB", output) + if mobj is not None: + result['throughput'] = float(mobj.group('thrup')) * 1000 + else: + result['throughput'] = 0 + result['error_msg'] = result.get('error_msg', '') + \ + "\n\nFailed match throughput regex!" + return result + + +def get_gpsd_tpv_result(): + """ + Query gpsd via a socket and return the corresponding JSON result as a + dictionary. + """ + my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + my_sock.connect(('localhost', 2947)) + sys.stderr.write("Connected to GPSDO socket.\n") + query_cmd = b'?WATCH={"enable":true,"json":true}' + my_sock.sendall(query_cmd) + sys.stderr.write("Sent query: {}\n".format(query_cmd)) + sock_read_line(my_sock, timeout=10) + sys.stderr.write("Received initial newline.\n") + result = {} + while result.get('class', None) != 'TPV': + json_result = sock_read_line(my_sock, timeout=60) + sys.stderr.write( + "Received JSON response: {}\n\n".format(json_result) + ) + result = json.loads(json_result) + my_sock.sendall(b'?WATCH={"enable":false}') + my_sock.close() + return result + +def get_ref_clock_prop(clock_source, time_source, extra_args=None): + """ + Helper function to determine reference clock lock + Description: Checks to see if we can lock to a clock source. + The actual value is yanked from the property tree. + + External Equipment: None + Return dictionary: + - : + - locked: Boolean lock status + """ + extra_args = extra_args or {} + result = {} + extra_args_str = ",".join( + ['{k}={v}'.format(k=k, v=v) for k, v in iteritems(extra_args)]) + cmd = [ + 'uhd_usrp_probe', + '--args', + 'addr=127.0.0.1,clock_source={c},time_source={t},{e}'.format( + c=clock_source, t=time_source, e=extra_args_str), + '--sensor' + ] + sensor_path = '/mboards/0/sensors/ref_locked' + cmd.append(sensor_path) + ref_lock_executor = ' '.join(cmd) + try: + output = subprocess.check_output( + ref_lock_executor, + stderr=subprocess.PIPE, + shell=True, + ) + except subprocess.CalledProcessError as ex: + # Don't throw errors from uhd_usrp_probe + output = ex.output + output = output.decode("utf-8").strip() + mobj = re.search(r"true$", output) + if mobj is not None: + result['ref_locked'] = True + else: + result['ref_locked'] = False + result['error_msg'] = "Reference Clock not locked: " + output + return result + +def get_temp_sensor_value(temp_sensor_map): + """ + Read a temp sensor value from the system and return a dictionary of the + form {temp_sensor_lookup(device): $temp} + """ + import pyudev + context = pyudev.Context() + return { + temp_sensor_map(device): \ + int(device.attributes.get('temp').decode('ascii')) + for device in context.list_devices(subsystem='thermal') + if 'temp' in device.attributes.available_attributes \ + and device.attributes.get('temp') is not None + } + +def get_fan_values(): + """ + Return a dict of fan name -> fan speed key/values. + """ + import pyudev + context = pyudev.Context() + return { + device.sys_name: int(device.attributes.get('cur_state')) + for device in context.list_devices(subsystem='thermal') + if 'cur_state' in device.attributes.available_attributes \ + and device.attributes.get('cur_state') is not None + } + +def get_link_up(if_name): + """ + Return a dictionary {if_name: IFLA_OPERSTATE} + """ + from pyroute2 import IPRoute + result = {} + with IPRoute() as ipr: + links = ipr.link_lookup(ifname=if_name) + if not links: + return {'error_msg': "No interface found"} + link_info = next(iter(ipr.get_links(links)), None) + if link_info is None: + return {'error_msg': "Error on get_links for sfp0"} + result['sfp0'] = link_info.get_attr('IFLA_OPERSTATE') + if result['sfp0'] != 'UP': + result['error_msg'] = "Link not up for interface" + return result + + + +############################################################################## +# BIST class +############################################################################## +class UsrpBIST(object): + """ + BIST parent class + """ + usrp_type = None + default_rev = 3 # Because why not + # This defines special tests that are really collections of other tests. + collections = { + 'standard': ["rtc",], + 'extended': "*", + } + # Default FPGA image type + DEFAULT_FPGA_TYPE = None + lv_compat_format = None + device_args = 'addr=127.0.0.1' + + def make_arg_parser(self): + """ + Return arg parser + """ + parser = argparse.ArgumentParser( + description="{} BIST Tool".format(self.usrp_type), + ) + parser.add_argument( + '-n', '--dry-run', action='store_true', + help="Fake out the tests. All tests will return a valid" \ + " response, but will not actually interact with hardware.", + ) + parser.add_argument( + '-v', '--verbose', action='store_true', + help="Crank up verbosity level", + ) + parser.add_argument( + '--debug', action='store_true', + help="For debugging this tool.", + ) + parser.add_argument( + '--option', '-o', action='append', default=[], + help="Option for individual test.", + ) + parser.add_argument( + '--lv-compat', action='store_true', + help="Provides compatibility with the LV JSON parser. Don't run " + "this mode unless you know what you're doing. The JSON " + "output does not necessarily reflect the actual system " + "status when using this mode.", + ) + parser.add_argument( + '--skip-fpga-reload', action='store_true', + help="Skip reloading the default FPGA image post-test. Note: by" + "specifying this argument, the FPGA image loaded could be " + "anything post-test.", + ) + parser.add_argument( + 'tests', + help="List the tests that should be run", + nargs='+', # There has to be at least one + ) + return parser + + def get_mb_periph_mgr(self): + """Needs to be implemented by child class""" + raise NotImplementedError + + def get_product_id(self): + """Needs to be implemented by child class""" + raise NotImplementedError + + def __init__(self): + assert self.DEFAULT_FPGA_TYPE is not None + assert self.device_args is not None + assert self.usrp_type is not None + assert self.lv_compat_format is not None + self.args = self.make_arg_parser().parse_args() + self.args.option = expand_options(self.args.option) + # If this is true, trigger a reload of the default FPGA image + self.reload_fpga_image = False + try: + default_rev = self.get_mb_periph_mgr().mboard_max_rev + except ImportError: + # This means we're in dry run mode or something like that, so just + # pick something + default_rev = self.default_rev + self.mb_rev = int(self.args.option.get('mb_rev', default_rev)) + self.tests_to_run = set() + for test in self.args.tests: + if test in self.collections: + for this_test in self.expand_collection(test): + self.tests_to_run.add(this_test) + else: + self.tests_to_run.add(test) + try: + # Keep this import here so we can do dry-runs without any MPM code + from usrp_mpm import get_main_logger + if not self.args.verbose: + from usrp_mpm.mpmlog import WARNING + get_main_logger().setLevel(WARNING) + self.log = get_main_logger().getChild('main') + except ImportError: + print("No logging capability available.") + + def expand_collection(self, coll): + """ + Return names of tests in a collection + """ + tests = self.collections[coll] + if tests == "*": + tests = {x.replace('bist_', '') + for x in dir(self) + if x.find('bist_') == 0 + } + else: + tests = set(tests) + return tests + + def run(self): + """ + Execute tests. + + Returns True on Success. + """ + def execute_test(testname): + """ + Actually run a test. + """ + testmethod_name = "bist_{0}".format(testname) + sys.stderr.write( + "Executing test method: {0}\n\n".format(testmethod_name) + ) + if not hasattr(self, testmethod_name): + sys.stderr.write("Test not defined: `{}`\n".format(testname)) + return False, {} + try: + status, data = getattr(self, testmethod_name)() + data['status'] = status + data['error_msg'] = data.get('error_msg', '') + return status, data + except Exception as ex: + sys.stderr.write( + "Test {} failed to execute: {}\n".format(testname, str(ex)) + ) + if self.args.debug: + raise + return False, {'error_msg': str(ex)} + tests_successful = True + result = {} + for test in self.tests_to_run: + status, result_data = execute_test(test) + tests_successful = tests_successful and status + result[test] = result_data + if self.args.lv_compat: + result = filter_results_for_lv(result, self.lv_compat_format) + post_results(result) + if self.reload_fpga_image and not self.args.skip_fpga_reload: + load_fpga_image( + self.DEFAULT_FPGA_TYPE, + self.device_args, + self.get_product_id(), + ) + return tests_successful + +############################################################################# +# BISTS +# All bist_* methods must return True/False success values! +############################################################################# + def bist_rtc(self): + """ + BIST for RTC (real time clock) + + Return dictionary: + - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601 + format, as a string. As if running 'date -Iseconds -u'. + - time: Same time, but in seconds since epoch. + + Return status: + Unless datetime throws an exception, returns True. + """ + assert 'rtc' in self.tests_to_run + utc_now = datetime.utcnow() + return True, { + 'time': time.mktime(utc_now.timetuple()), + 'date': utc_now.replace(microsecond=0).isoformat() + "+00:00", + } -- cgit v1.2.3