diff options
author | Sugandha Gupta <sugandha.gupta@ettus.com> | 2018-05-17 16:11:45 -0700 |
---|---|---|
committer | Brent Stapleton <bstapleton@g.hmc.edu> | 2018-07-18 15:37:27 -0700 |
commit | 59b4e8f89271dd2878c94a06e42c5e798c0ce812 (patch) | |
tree | 9c4207e9a823c3d0b75cab39210f634699ba87d4 | |
parent | 1e4649ce296e1af376e1864576ab207b1b30fc41 (diff) | |
download | uhd-59b4e8f89271dd2878c94a06e42c5e798c0ce812.tar.gz uhd-59b4e8f89271dd2878c94a06e42c5e798c0ce812.tar.bz2 uhd-59b4e8f89271dd2878c94a06e42c5e798c0ce812.zip |
mpm: e320_bist: Add tests for running BIST on E320
-rwxr-xr-x | mpm/python/e320_bist | 769 |
1 files changed, 769 insertions, 0 deletions
diff --git a/mpm/python/e320_bist b/mpm/python/e320_bist new file mode 100755 index 000000000..8ee5b9b91 --- /dev/null +++ b/mpm/python/e320_bist @@ -0,0 +1,769 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +E320 Built-In Self Test (BIST) + +""" + +from __future__ import print_function +import os +import sys +import subprocess +import re +import socket +import select +import time +import json +from datetime import datetime +import argparse +from six import iteritems + +# Timeout values are in seconds: +GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute" +GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test + # does not necessarily require GPS lock to pass, we + # reduce this value in order for the BIST to pass faster + # by default. + +############################################################################## +# Aurora/SFP BIST code +############################################################################## +def get_sfp_bist_defaults(): + " Default dictionary for SFP/Aurora BIST dry-runs " + return { + 'elapsed_time': 1.0, + 'max_roundtrip_latency': 0.8e-6, + 'throughput': 1000e6, + 'max_ber': 8.5e-11, + 'errors': 0, + 'bits': 12012486656, + } + +def run_aurora_bist(master, slave=None): + """ + Spawn a BER test + """ + from usrp_mpm import aurora_control + from usrp_mpm.sys_utils.uio import UIO + try: + master_au_uio = UIO(label=master, read_only=False) + master_au_uio.open() + master_au_ctrl = aurora_control.AuroraControl(master_au_uio) + if slave is not None: + slave_au_uio = UIO(label=slave, read_only=False) + slave_au_uio.open() + slave_au_ctrl = None if slave is None else aurora_control.AuroraControl( + slave_au_uio + ) + return master_au_ctrl.run_ber_loopback_bist( + duration=10, + requested_rate=1300 * 8e6, + slave=slave_au_ctrl, + ) + except Exception as ex: + print("Unexpected exception: {}".format(str(ex))) + exit(1) + finally: + master_au_uio.close() + if slave is not None: + slave_au_uio.close() + +def aurora_results_to_status(bist_results): + """ + Convert a dictionary coming from AuroraControl BIST to one that we can use + for this BIST + """ + return bist_results['mst_errors'] == 0, { + 'elapsed_time': bist_results['time_elapsed'], + 'max_roundtrip_latency': bist_results['mst_latency_us'], + 'throughput': bist_results['approx_throughput'], + 'max_ber': bist_results['max_ber'], + 'errors': bist_results['mst_errors'], + 'bits': bist_results['mst_samps'], + } + +############################################################################## +# Helpers +############################################################################## +def post_results(results): + """ + Given a dictionary, post the results. + + This will print the results as JSON to stdout. + """ + print(json.dumps( + results, + sort_keys=True, + indent=4, + separators=(',', ': ') + )) + +def filter_results_for_lv(results): + """ + The LabView JSON parser does not support a variety of things, such as + nested dicts, and some downstream LV applications freak out if certain keys + are not what they expect. + This is a long hard-coded list of how results should look like for those + cases. Note: This list needs manual supervision and attention for the case + where either subsystems get renamed, or other architectural changes should + occur. + """ + lv_compat_format = { + 'ddr3': { + 'throughput': -1, + }, + 'gpsdo': { + "class": "", + "time": "", + "ept": -1, + "lat": -1, + "lon": -1, + "alt": -1, + "epx": -1, + "epy": -1, + "epv": -1, + "track": -1, + "speed": -1, + "climb": -1, + "eps": -1, + "mode": -1, + }, + 'tpm': { + 'tpm0_caps': "", + }, + 'sfp_loopback': { + 'elapsed_time': -1, + 'max_roundtrip_latency': -1, + 'throughput': -1, + 'max_ber': -1, + 'errors': -1, + 'bits': -1, + }, + 'gpio': { + 'write_patterns': [], + 'read_patterns': [], + }, + 'temp': { + 'fpga-thermal-zone': -1, + }, + 'fan': { + 'cooling_device0': -1, + 'cooling_device1': -1, + }, + } + # OK now go and brush up the results: + def fixup_dict(result_dict, ref_dict): + """ + Touches up result_dict according to ref_dict by the following rules: + - If a key is in result_dict that is not in ref_dict, delete that + - If a key is in ref_dict that is not in result_dict, use the value + from ref_dict + """ + ref_dict['error_msg'] = "" + ref_dict['status'] = False + result_dict = { + k: v for k, v in iteritems(result_dict) + if k in ref_dict or k in ('error_msg', 'status') + } + result_dict = { + k: result_dict.get(k, ref_dict[k]) for k in ref_dict + } + return result_dict + results = { + testname: fixup_dict(testresults, lv_compat_format[testname]) \ + if testname in lv_compat_format else testresults + for testname, testresults in iteritems(results) + } + return results + +def sock_read_line(my_sock, timeout=60, interval=0.1): + """ + Read from a socket until newline. If there was no newline until the timeout + occurs, raise an error. Otherwise, return the line. + """ + line = b'' + end_time = time.time() + timeout + while time.time() < end_time: + socket_ready = select.select([my_sock], [], [], 0)[0] + if socket_ready: + next_char = my_sock.recv(1) + if next_char == b'\n': + return line.decode('ascii') + line += next_char + else: + time.sleep(interval) + raise RuntimeError("sock_read_line() exceeded read timeout!") + +def poll_with_timeout(state_check, timeout_ms, interval_ms): + """ + Calls state_check() every interval_ms until it returns a positive value, or + until a timeout is exceeded. + + Returns True if state_check() returned True within the timeout. + """ + max_time = time.time() + (float(timeout_ms) / 1000) + interval_s = float(interval_ms) / 1000 + while time.time() < max_time: + if state_check(): + return True + time.sleep(interval_s) + return False + +def expand_options(option_list): + """ + Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar', + 'spam': 'eggs'}. + """ + return dict(x.split('=') for x in option_list) + +############################################################################## +# Bist class +############################################################################## +class E320BIST(object): + """ + BIST Tool for the USRP E320 + """ + # This defines special tests that are really collections of other tests. + collections = { + 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro", "ref_clock_int"], + 'extended': "*", + } + + @staticmethod + def make_arg_parser(): + """ + Return arg parser + """ + parser = argparse.ArgumentParser( + description="E320 BIST Tool", + ) + parser.add_argument( + '-n', '--dry-run', action='store_true', + help="Fake out the tests. All tests will return a valid" \ + " response, but will not actually interact with hardware.", + ) + parser.add_argument( + '-v', '--verbose', action='store_true', + help="Crank up verbosity level", + ) + parser.add_argument( + '--debug', action='store_true', + help="For debugging this tool.", + ) + parser.add_argument( + '--option', '-o', action='append', default=[], + help="Option for individual test.", + ) + parser.add_argument( + '--lv-compat', action='store_true', + help="Provides compatibility with the LV JSON parser. Don't run " + "this mode unless you know what you're doing. The JSON " + "output does not necessarily reflect the actual system " + "status when using this mode.", + ) + parser.add_argument( + 'tests', + help="List the tests that should be run", + nargs='+', # There has to be at least one + ) + return parser + + def __init__(self): + self.args = E320BIST.make_arg_parser().parse_args() + self.args.option = expand_options(self.args.option) + try: + from usrp_mpm.periph_manager.e320 import e320 + default_rev = e320.mboard_max_rev + except ImportError: + # This means we're in dry run mode or something like that, so just + # pick something + default_rev = 3 + self.mb_rev = int(self.args.option.get('mb_rev', default_rev)) + self.tests_to_run = set() + for test in self.args.tests: + if test in self.collections: + for test in self.expand_collection(test): + self.tests_to_run.add(test) + else: + self.tests_to_run.add(test) + try: + # Keep this import here so we can do dry-runs without any MPM code + from usrp_mpm import get_main_logger + if not self.args.verbose: + from usrp_mpm.mpmlog import WARNING + get_main_logger().setLevel(WARNING) + self.log = get_main_logger().getChild('main') + except ImportError: + print("No logging capability available.") + + def expand_collection(self, coll): + """ + Return names of tests in a collection + """ + tests = self.collections[coll] + if tests == "*": + tests = {x.replace('bist_', '') + for x in dir(self) + if x.find('bist_') == 0 + } + else: + tests = set(tests) + return tests + + def run(self): + """ + Execute tests. + + Returns True on Success. + """ + def execute_test(testname): + """ + Actually run a test. + """ + testmethod_name = "bist_{0}".format(testname) + sys.stderr.write( + "Executing test method: {0}\n\n".format(testmethod_name) + ) + try: + status, data = getattr(self, testmethod_name)() + data['status'] = status + data['error_msg'] = data.get('error_msg', '') + return status, data + except AttributeError: + sys.stderr.write("Test not defined: {}\n".format(testname)) + return False, {} + except Exception as ex: + sys.stderr.write( + "Test {} failed to execute: {}\n".format(testname, str(ex)) + ) + if self.args.debug: + raise + return False, {'error_msg': str(ex)} + tests_successful = True + result = {} + for test in self.tests_to_run: + status, result_data = execute_test(test) + tests_successful = tests_successful and status + result[test] = result_data + if self.args.lv_compat: + result = filter_results_for_lv(result) + post_results(result) + return tests_successful + +############################################################################# +# BISTS +# All bist_* methods must return True/False success values! +############################################################################# + def bist_rtc(self): + """ + BIST for RTC (real time clock) + + Return dictionary: + - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601 + format, as a string. As if running 'date -Iseconds -u'. + - time: Same time, but in seconds since epoch. + + Return status: + Unless datetime throws an exception, returns True. + """ + assert 'rtc' in self.tests_to_run + utc_now = datetime.utcnow() + return True, { + 'time': time.mktime(utc_now.timetuple()), + 'date': utc_now.replace(microsecond=0).isoformat() + "+00:00", + } + + def bist_gyro(self): + """ + BIST for GYRO (MPU9250) + + Return dictionary: + + Return status: True if MPU9250 is detected. + """ + assert 'gyro' in self.tests_to_run + if self.args.dry_run: + return True, {'device_name': 'dry_run mpu9250'} + import pyudev + context = pyudev.Context() + result = { + 'device_name': device.get('OF_NAME') + for device in context.list_devices(subsystem='iio', DEVTYPE='iio_device') + if 'mpu9250' in device.get('OF_NAME') is not None + } + if len(result) < 1: + result['error_msg'] = "No GYRO detected!" + return 'error_msg' not in result, result + + + def bist_ddr3(self): + """ + BIST for PL DDR3 DRAM + Description: Calls a test to examine the speed of the DDR3. To be + precise, it fires up a UHD session, which runs a DDR3 BiST internally. + If that works, it'll return estimated throughput that was gathered + during the DDR3 BiST. + + External Equipment: None + + Return dictionary: + - throughput: The estimated throughput in bytes/s + + Return status: + True if the DDR3 bist passed + """ + assert 'ddr3' in self.tests_to_run + if self.args.dry_run: + return True, {'throughput': 1250e6} + result = {} + ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1' + try: + output = subprocess.check_output( + ddr3_bist_executor, + stderr=subprocess.STDOUT, + shell=True, + ) + except subprocess.CalledProcessError as ex: + # Don't throw errors from uhd_usrp_probe + output = ex.output + output = output.decode("utf-8") + mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output) + if mobj is not None: + result['throughput'] = float(mobj.group('thrup')) * 1000 + else: + result['throughput'] = 0 + result['error_msg'] = result.get('error_msg', '') + \ + "\n\nFailed match throughput regex!" + return result.get('throughput', 0) > 1000e3, result + + def bist_gpsdo(self): + """ + BIST for GPSDO + Description: Returns GPS information + External Equipment: None; Recommend attaching an antenna or providing + fake GPS information + + Return dictionary: A TPV dictionary as returned by gpsd. + See also: http://www.catb.org/gpsd/gpsd_json.html + + Check for mode 2 or 3 to see if it's locked. + """ + assert 'gpsdo' in self.tests_to_run + if self.args.dry_run: + return True, { + "class": "TPV", + "time": "2014-30T11:48:20.10Z", + "ept": 0.005, + "lat": 30.407899, + "lon": -97.726634, + "alt": 1327.689, + "epx": 15.319, + "epy": 17.054, + "epv": 124.484, + "track": 10.3797, + "speed": 0.091, + "climb": -0.085, + "eps": 34.11, + "mode": 3 + } + from usrp_mpm.periph_manager import e320 + # Turn on GPS, give some time to acclimatize + print("Entered gpsdo test") + mb_regs = e320.MboardRegsControl(e320.e320.mboard_regs_label, self.log) + mb_regs.enable_gps(True) + time.sleep(5) + gps_warmup_timeout = float( + self.args.option.get('gps_warmup_timeout', GPS_WARMUP_TIMEOUT)) + gps_lockok_timeout = float( + self.args.option.get('gps_lockok_timeout', GPS_LOCKOK_TIMEOUT)) + # Wait for WARMUP to go low + sys.stderr.write( + "Waiting for WARMUP to go low for up to {} seconds...\n".format( + gps_warmup_timeout)) + if not poll_with_timeout( + lambda: not bool((mb_regs.get_gps_status() >> 4) & 0x1), + gps_warmup_timeout*1000, 1000 + ): + raise RuntimeError( + "GPS-WARMUP did not go low within {} seconds!".format( + gps_warmup_timeout)) + sys.stderr.write("Chip is warmed up.\n") + # Wait for LOCKOK. Data sheet says wait up to 15 minutes for GPS lock. + sys.stderr.write( + "Waiting for LOCKOK to go high for up to {} seconds...\n".format( + gps_lockok_timeout)) + if not poll_with_timeout( + mb_regs.get_gps_locked_val, + gps_lockok_timeout*1000, + 1000 + ): + sys.stderr.write("No GPS-LOCKOK!\n") + gps_status = mb_regs.get_gps_status() + sys.stderr.write("GPS-SURVEY status: {}\n".format( + (gps_status >> 3) & 0x1 + )) + sys.stderr.write("GPS-PHASELOCK status: {}\n".format( + (gps_status >> 2) & 0x1 + )) + sys.stderr.write("GPS-ALARM status: {}\n".format( + (gps_status >> 1) & 0x1 + )) + # Now read back response from chip + my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + my_sock.connect(('localhost', 2947)) + sys.stderr.write("Connected to GPSDO socket.\n") + query_cmd = b'?WATCH={"enable":true,"json":true}' + my_sock.sendall(query_cmd) + sys.stderr.write("Sent query: {}\n".format(query_cmd)) + sock_read_line(my_sock, timeout=10) + sys.stderr.write("Received initial newline.\n") + result = {} + while result.get('class', None) != 'TPV': + json_result = sock_read_line(my_sock, timeout=60) + sys.stderr.write( + "Received JSON response: {}\n\n".format(json_result) + ) + result = json.loads(json_result) + my_sock.sendall(b'?WATCH={"enable":false}') + my_sock.close() + # If we reach this line, we have a valid result and the chip responded. + # However, it doesn't necessarily mean we had a GPS lock. + return True, result + + def bist_tpm(self): + """ + BIST for TPM (Trusted Platform Module) + + This reads the caps value for all detected TPM devices. + + Return dictionary: + - tpm<N>_caps: TPM manufacturer and version info. Is a multi-line + string. + + Return status: True if exactly one TPM device is detected. + """ + assert 'tpm' in self.tests_to_run + if self.args.dry_run: + return True, { + 'tpm0_caps': "Fake caps value\n\nVersion 0.0.0", + } + result = {} + props_to_read = ('caps',) + base_path = '/sys/class/tpm' + for tpm_device in os.listdir(base_path): + if tpm_device.startswith('tpm'): + for key in props_to_read: + result['{}_{}'.format(tpm_device, key)] = open( + os.path.join(base_path, tpm_device, key), 'r' + ).read().strip() + return len(result) == 1, result + + def bist_ref_clock_int(self): + """ + BIST for clock lock from internal (20MHz). + Description: Checks to see if we can lock to an internal + clock source. + + External Equipment: None + Return dictionary: + - <sensor-name>: + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_int' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + # RevB only: Read register from FPGA for lock status. + result = {} + from usrp_mpm.periph_manager import e320, e320_periphs + mb_regs = e320_periphs.MboardRegsControl(e320.e320.mboard_regs_label, self.log) + mb_regs.set_clock_source('internal', 20e6) + time.sleep(5) + if mb_regs.get_refclk_lock(): + result = {'ref_locked': mb_regs.get_refclk_lock()} + if len(result) < 1: + result['error_msg'] = "Reference Clock not locked" + return 'error_msg' not in result, result + + def bist_ref_clock_ext(self): + """ + BIST for clock lock from external (10MHz) source. + Description: Checks to see if we can lock to an external + clock source. + + External Equipment: External 10 MHz reference clock needed (from Octoclock) + Return dictionary: + - <sensor-name>: + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_ext' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + # RevB only: Read register from FPGA for lock status. + result = {} + from usrp_mpm.periph_manager import e320, e320_periphs + mb_regs = e320_periphs.MboardRegsControl(e320.e320.mboard_regs_label, self.log) + mb_regs.set_clock_source('external', 10e6) + time.sleep(5) + if mb_regs.get_refclk_lock(): + result = {'ref_locked': mb_regs.get_refclk_lock()} + if len(result) < 1: + result['error_msg'] = "Reference Clock not locked" + return 'error_msg' not in result, result + + def bist_sfp_loopback(self): + """ + BIST for SFP+ ports: + Description: Uses one SFP+ port in loopback mode. + + External Equipment: Loopback module in SFP required + required. + + Return dictionary: + - elapsed_time: Float value, test time in seconds + - max_roundtrip_latency: Float value, max roundtrip latency in seconds + - throughput: Approximate data throughput in bytes/s + - max_ber: Estimated maximum BER, float value. + - errors: Number of errors + - bits: Number of bits that were transferred + """ + if self.args.dry_run: + return True, get_sfp_bist_defaults() + sfp_bist_results = run_aurora_bist(master='misc-auro-regs') + return aurora_results_to_status(sfp_bist_results) + + def bist_gpio(self): + """ + BIST for GPIO + Description: Writes and reads the values to the GPIO + + Needed Equipment: External loopback as follows + GPIO + 0<->4 + 1<->5 + 2<->6 + 3<->7 + + Return dictionary: + - write_patterns: A list of patterns that were written + - read_patterns: A list of patterns that were read back + """ + assert 'gpio' in self.tests_to_run + GPIO_WIDTH = 8 + patterns = range(16) + if self.args.dry_run: + return True, { + 'write_patterns': list(patterns), + 'read_patterns': list(patterns), + } + from usrp_mpm.periph_manager import e320, e320_periphs + mb_regs = e320_periphs.MboardRegsControl(e320.e320.mboard_regs_label, self.log) + mb_regs.enable_fp_gpio(True) + mb_regs.set_fp_gpio_voltage(2.5) + mb_regs.set_fp_gpio_master(0xFF) + # Allow some time for the front-panel GPIOs to become usable + time.sleep(1) + ddr1 = 0xf0 + ddr2 = 0x0f + def _run_gpio(ddr, patterns): + " Run a GPIO test for a given set of patterns " + gpio_ctrl = e320_periphs.FrontpanelGPIO(ddr) + for pattern in patterns: + gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr) + time.sleep(0.1) + gpio_rb = gpio_ctrl.get_all() + if pattern != gpio_rb: + return False, {'write_patterns': [pattern], + 'read_patterns': [gpio_rb]} + return True, {'write_patterns': list(patterns), + 'read_patterns': list(patterns)} + status, data = _run_gpio(ddr1, patterns) + if not status: + return status, data + status, data = _run_gpio(ddr2, patterns) + return status, data + + def bist_temp(self): + """ + BIST for temperature sensors + Description: Reads the temperature sensors on the motherboards and + returns their values in mC + + Return dictionary: + - <thermal-zone-name>: temp in mC + """ + assert 'temp' in self.tests_to_run + if self.args.dry_run: + return True, {'fpga-thermal-zone': 30000} + import pyudev + context = pyudev.Context() + result = { + device.attributes.get('type').decode('ascii'): \ + int(device.attributes.get('temp').decode('ascii')) + for device in context.list_devices(subsystem='thermal') + if 'temp' in device.attributes.available_attributes \ + and device.attributes.get('temp') is not None + } + if len(result) < 1: + result['error_msg'] = "No temperature sensors found!" + return 'error_msg' not in result, result + + def bist_fan(self): + """ + BIST for temperature sensors + Description: Reads the RPM values of the fans on the motherboard + + Return dictionary: + - <fan-name>: Fan speed in RPM + + External Equipment: None + """ + assert 'fan' in self.tests_to_run + if self.args.dry_run: + return True, {'cooling_device0': 10000, 'cooling_device1': 10000} + import pyudev + context = pyudev.Context() + result = { + device.sys_name: int(device.attributes.get('cur_state')) + for device in context.list_devices(subsystem='thermal') + if 'cur_state' in device.attributes.available_attributes \ + and device.attributes.get('cur_state') is not None + } + return len(result) == 2, result + + +def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask): + """Helper function for set gpio. + What this function do is take decimal value and convert to a binary string + then try to set those individual bits to the gpio_bank. + Arguments: + gpio_bank -- gpio bank type. + value -- value to set onto gpio bank. + gpio_size -- size of the gpio bank + ddr_mask -- data direction register bit mask. 0 is input; 1 is output. + """ + ddr_size = bin(ddr_mask).count("1") + value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):] + ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):] + for i in range(gpio_size): + if ddr_bitstring[gpio_size - 1 - i] == "1": + gpio_bank.set(i, value_bitstring[i % ddr_size]) + +############################################################################## +# main +############################################################################## +def main(): + " Go, go, go! " + return E320BIST().run() + +if __name__ == '__main__': + exit(not main()) |