aboutsummaryrefslogtreecommitdiffstats
path: root/mpm
diff options
context:
space:
mode:
Diffstat (limited to 'mpm')
-rwxr-xr-xmpm/python/e320_bist561
-rwxr-xr-xmpm/python/n3xx_bist697
-rw-r--r--mpm/python/usrp_mpm/bist.py598
3 files changed, 786 insertions, 1070 deletions
diff --git a/mpm/python/e320_bist b/mpm/python/e320_bist
index ef7c85980..ad5b2163a 100755
--- a/mpm/python/e320_bist
+++ b/mpm/python/e320_bist
@@ -6,21 +6,12 @@
#
"""
E320 Built-In Self Test (BIST)
-
"""
from __future__ import print_function
-import os
import sys
-import subprocess
-import re
-import socket
-import select
import time
-import json
-from datetime import datetime
-import argparse
-from six import iteritems
+from usrp_mpm import bist
# Timeout values are in seconds:
GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute"
@@ -29,113 +20,30 @@ GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test
# reduce this value in order for the BIST to pass faster
# by default.
# Temperature Sensor Mapping based on location
-temp_sensor_map = {
+TEMP_SENSOR_MAP = {
"thermal_zone0" : "internal",
"thermal_zone1" : "rf_channelA",
"thermal_zone2" : "fpga",
"thermal_zone3" : "rf_channelB",
"thermal_zone4" : "main_power"
}
-##############################################################################
-# Aurora/SFP BIST code
-##############################################################################
-def get_sfp_bist_defaults():
- " Default dictionary for SFP/Aurora BIST dry-runs "
- return {
- 'elapsed_time': 1.0,
- 'max_roundtrip_latency': 0.8e-6,
- 'throughput': 1000e6,
- 'max_ber': 8.5e-11,
- 'errors': 0,
- 'bits': 12012486656,
- }
-
-def assert_aurora_image(master, slave):
- """
- Make sure we have an FPGA image with which we can run the requested tests.
-
- Will load an AA image if not, which always satisfies all conditions for
- running Aurora tests.
- """
- from usrp_mpm.sys_utils import uio
- if not uio.find_uio_device(master)[0] or \
- (slave is not None and not uio.find_uio_device(slave)[0]):
- load_fpga_image('AA')
-
-def run_aurora_bist(master, slave=None):
- """
- Spawn a BER test
- """
- from usrp_mpm import aurora_control
- from usrp_mpm.sys_utils.uio import open_uio
-
- class DummyContext(object):
- """Dummy class for context managers"""
- def __enter__(self):
- return
-
- def __exit__(self, exc_type, exc_value, traceback):
- return exc_type is None
-
- # Go, go, go!
- try:
- assert_aurora_image(master, slave)
- with open_uio(label=master, read_only=False) as master_au_uio:
- master_au_ctrl = aurora_control.AuroraControl(master_au_uio)
- with open_uio(label=slave, read_only=False)\
- if slave is not None else DummyContext() as slave_au_uio:
- slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\
- if slave is not None else None
- return master_au_ctrl.run_ber_loopback_bist(
- duration=10,
- requested_rate=1300 * 8e6,
- slave=slave_au_ctrl,
- )
- except Exception as ex:
- print("Unexpected exception: {}".format(str(ex)))
- exit(1)
-
-
-def aurora_results_to_status(bist_results):
- """
- Convert a dictionary coming from AuroraControl BIST to one that we can use
- for this BIST
- """
- return bist_results['mst_errors'] == 0, {
- 'elapsed_time': bist_results['time_elapsed'],
- 'max_roundtrip_latency': bist_results['mst_latency_us'],
- 'throughput': bist_results['approx_throughput'],
- 'max_ber': bist_results['max_ber'],
- 'errors': bist_results['mst_errors'],
- 'bits': bist_results['mst_samps'],
- }
##############################################################################
-# Helpers
+# Bist class
##############################################################################
-def post_results(results):
- """
- Given a dictionary, post the results.
-
- This will print the results as JSON to stdout.
- """
- print(json.dumps(
- results,
- sort_keys=True,
- indent=4,
- separators=(',', ': ')
- ))
-
-def filter_results_for_lv(results):
+class E320BIST(bist.UsrpBIST):
"""
- The LabView JSON parser does not support a variety of things, such as
- nested dicts, and some downstream LV applications freak out if certain keys
- are not what they expect.
- This is a long hard-coded list of how results should look like for those
- cases. Note: This list needs manual supervision and attention for the case
- where either subsystems get renamed, or other architectural changes should
- occur.
+ BIST Tool for the USRP E320
"""
+ usrp_type = "E320"
+ # This defines special tests that are really collections of other tests.
+ collections = {
+ 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro",
+ "ref_clock_int"],
+ 'extended': "*",
+ }
+ # Default FPGA image type
+ DEFAULT_FPGA_TYPE = '1G'
lv_compat_format = {
'ddr3': {
'throughput': -1,
@@ -172,250 +80,35 @@ def filter_results_for_lv(results):
'read_patterns': [],
},
'temp': {
- temp_sensor_map['thermal_zone0']: -1,
- temp_sensor_map['thermal_zone1']: -1,
- temp_sensor_map['thermal_zone2']: -1,
- temp_sensor_map['thermal_zone3']: -1,
- temp_sensor_map['thermal_zone4']: -1,
+ TEMP_SENSOR_MAP['thermal_zone0']: -1,
+ TEMP_SENSOR_MAP['thermal_zone1']: -1,
+ TEMP_SENSOR_MAP['thermal_zone2']: -1,
+ TEMP_SENSOR_MAP['thermal_zone3']: -1,
+ TEMP_SENSOR_MAP['thermal_zone4']: -1,
},
'fan': {
'cooling_device0': -1,
},
}
- # OK now go and brush up the results:
- def fixup_dict(result_dict, ref_dict):
- """
- Touches up result_dict according to ref_dict by the following rules:
- - If a key is in result_dict that is not in ref_dict, delete that
- - If a key is in ref_dict that is not in result_dict, use the value
- from ref_dict
- """
- ref_dict['error_msg'] = ""
- ref_dict['status'] = False
- result_dict = {
- k: v for k, v in iteritems(result_dict)
- if k in ref_dict or k in ('error_msg', 'status')
- }
- result_dict = {
- k: result_dict.get(k, ref_dict[k]) for k in ref_dict
- }
- return result_dict
- results = {
- testname: fixup_dict(testresults, lv_compat_format[testname]) \
- if testname in lv_compat_format else testresults
- for testname, testresults in iteritems(results)
- }
- return results
-
-def sock_read_line(my_sock, timeout=60, interval=0.1):
- """
- Read from a socket until newline. If there was no newline until the timeout
- occurs, raise an error. Otherwise, return the line.
- """
- line = b''
- end_time = time.time() + timeout
- while time.time() < end_time:
- socket_ready = select.select([my_sock], [], [], 0)[0]
- if socket_ready:
- next_char = my_sock.recv(1)
- if next_char == b'\n':
- return line.decode('ascii')
- line += next_char
- else:
- time.sleep(interval)
- raise RuntimeError("sock_read_line() exceeded read timeout!")
-
-def poll_with_timeout(state_check, timeout_ms, interval_ms):
- """
- Calls state_check() every interval_ms until it returns a positive value, or
- until a timeout is exceeded.
-
- Returns True if state_check() returned True within the timeout.
- """
- max_time = time.time() + (float(timeout_ms) / 1000)
- interval_s = float(interval_ms) / 1000
- while time.time() < max_time:
- if state_check():
- return True
- time.sleep(interval_s)
- return False
-
-def expand_options(option_list):
- """
- Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar',
- 'spam': 'eggs'}.
- """
- return dict(x.split('=') for x in option_list)
+ device_args = "type=e3xx,addr=127.0.0.1"
-##############################################################################
-# Bist class
-##############################################################################
-class E320BIST(object):
- """
- BIST Tool for the USRP E320
- """
- # This defines special tests that are really collections of other tests.
- collections = {
- 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm", "gyro", "ref_clock_int"],
- 'extended': "*",
- }
- # Default FPGA image type
- DEFAULT_FPGA_TYPE = '1G'
+ def __init__(self):
+ super(E320BIST, self).__init__()
- @staticmethod
- def make_arg_parser():
- """
- Return arg parser
- """
- parser = argparse.ArgumentParser(
- description="E320 BIST Tool",
- )
- parser.add_argument(
- '-n', '--dry-run', action='store_true',
- help="Fake out the tests. All tests will return a valid" \
- " response, but will not actually interact with hardware.",
- )
- parser.add_argument(
- '-v', '--verbose', action='store_true',
- help="Crank up verbosity level",
- )
- parser.add_argument(
- '--debug', action='store_true',
- help="For debugging this tool.",
- )
- parser.add_argument(
- '--option', '-o', action='append', default=[],
- help="Option for individual test.",
- )
- parser.add_argument(
- '--lv-compat', action='store_true',
- help="Provides compatibility with the LV JSON parser. Don't run "
- "this mode unless you know what you're doing. The JSON "
- "output does not necessarily reflect the actual system "
- "status when using this mode.",
- )
- parser.add_argument(
- '--skip-fpga-reload', action='store_true',
- help="Skip reloading the default FPGA image post-test. Note: by"
- "specifying this argument, the FPGA image loaded could be "
- "anything post-test.",
- )
- parser.add_argument(
- 'tests',
- help="List the tests that should be run",
- nargs='+', # There has to be at least one
- )
- return parser
+ def get_mb_periph_mgr(self):
+ """Return reference to an e320 periph manager"""
+ from usrp_mpm.periph_manager.e320 import e320
+ return e320
- def __init__(self):
- self.args = E320BIST.make_arg_parser().parse_args()
- self.args.option = expand_options(self.args.option)
- # If this is true, trigger a reload of the default FPGA image
- self.reload_fpga_image = False
- try:
- from usrp_mpm.periph_manager.e320 import e320
- default_rev = e320.mboard_max_rev
- except ImportError:
- # This means we're in dry run mode or something like that, so just
- # pick something
- default_rev = 3
- self.mb_rev = int(self.args.option.get('mb_rev', default_rev))
- self.tests_to_run = set()
- for test in self.args.tests:
- if test in self.collections:
- for test in self.expand_collection(test):
- self.tests_to_run.add(test)
- else:
- self.tests_to_run.add(test)
- try:
- # Keep this import here so we can do dry-runs without any MPM code
- from usrp_mpm import get_main_logger
- if not self.args.verbose:
- from usrp_mpm.mpmlog import WARNING
- get_main_logger().setLevel(WARNING)
- self.log = get_main_logger().getChild('main')
- except ImportError:
- print("No logging capability available.")
-
- def expand_collection(self, coll):
- """
- Return names of tests in a collection
- """
- tests = self.collections[coll]
- if tests == "*":
- tests = {x.replace('bist_', '')
- for x in dir(self)
- if x.find('bist_') == 0
- }
- else:
- tests = set(tests)
- return tests
-
- def run(self):
- """
- Execute tests.
+ def get_product_id(self):
+ """Return the mboard product ID (e320):"""
+ return bist.get_product_id_from_eeprom(valid_ids=['e320'])
- Returns True on Success.
- """
- def execute_test(testname):
- """
- Actually run a test.
- """
- testmethod_name = "bist_{0}".format(testname)
- sys.stderr.write(
- "Executing test method: {0}\n\n".format(testmethod_name)
- )
- try:
- status, data = getattr(self, testmethod_name)()
- data['status'] = status
- data['error_msg'] = data.get('error_msg', '')
- return status, data
- except AttributeError:
- sys.stderr.write("Test not defined: {}\n".format(testname))
- return False, {}
- except Exception as ex:
- sys.stderr.write(
- "Test {} failed to execute: {}\n".format(testname, str(ex))
- )
- if self.args.debug:
- raise
- return False, {'error_msg': str(ex)}
- tests_successful = True
- result = {}
- for test in self.tests_to_run:
- status, result_data = execute_test(test)
- tests_successful = tests_successful and status
- result[test] = result_data
- if self.args.lv_compat:
- result = filter_results_for_lv(result)
- post_results(result)
- if self.reload_fpga_image and not self.args.skip_fpga_reload:
- load_fpga_image(self.DEFAULT_FPGA_TYPE)
- return tests_successful
#############################################################################
# BISTS
# All bist_* methods must return True/False success values!
#############################################################################
- def bist_rtc(self):
- """
- BIST for RTC (real time clock)
-
- Return dictionary:
- - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601
- format, as a string. As if running 'date -Iseconds -u'.
- - time: Same time, but in seconds since epoch.
-
- Return status:
- Unless datetime throws an exception, returns True.
- """
- assert 'rtc' in self.tests_to_run
- utc_now = datetime.utcnow()
- return True, {
- 'time': time.mktime(utc_now.timetuple()),
- 'date': utc_now.replace(microsecond=0).isoformat() + "+00:00",
- }
-
def bist_gyro(self):
"""
BIST for GYRO (MPU9250)
@@ -458,25 +151,7 @@ class E320BIST(object):
assert 'ddr3' in self.tests_to_run
if self.args.dry_run:
return True, {'throughput': 1250e6}
- result = {}
- ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1'
- try:
- output = subprocess.check_output(
- ddr3_bist_executor,
- stderr=subprocess.STDOUT,
- shell=True,
- )
- except subprocess.CalledProcessError as ex:
- # Don't throw errors from uhd_usrp_probe
- output = ex.output
- output = output.decode("utf-8")
- mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output)
- if mobj is not None:
- result['throughput'] = float(mobj.group('thrup')) * 1000
- else:
- result['throughput'] = 0
- result['error_msg'] = result.get('error_msg', '') + \
- "\n\nFailed match throughput regex!"
+ result = bist.test_ddr3_with_usrp_probe()
return result.get('throughput', 0) > 1000e3, result
def bist_gpsdo(self):
@@ -522,7 +197,7 @@ class E320BIST(object):
sys.stderr.write(
"Waiting for WARMUP to go low for up to {} seconds...\n".format(
gps_warmup_timeout))
- if not poll_with_timeout(
+ if not bist.poll_with_timeout(
lambda: not bool((mb_regs.get_gps_status() >> 4) & 0x1),
gps_warmup_timeout*1000, 1000
):
@@ -534,7 +209,7 @@ class E320BIST(object):
sys.stderr.write(
"Waiting for LOCKOK to go high for up to {} seconds...\n".format(
gps_lockok_timeout))
- if not poll_with_timeout(
+ if not bist.poll_with_timeout(
mb_regs.get_gps_locked_val,
gps_lockok_timeout*1000,
1000
@@ -550,24 +225,8 @@ class E320BIST(object):
sys.stderr.write("GPS-ALARM status: {}\n".format(
(gps_status >> 1) & 0x1
))
- # Now read back response from chip
- my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- my_sock.connect(('localhost', 2947))
- sys.stderr.write("Connected to GPSDO socket.\n")
- query_cmd = b'?WATCH={"enable":true,"json":true}'
- my_sock.sendall(query_cmd)
- sys.stderr.write("Sent query: {}\n".format(query_cmd))
- sock_read_line(my_sock, timeout=10)
- sys.stderr.write("Received initial newline.\n")
- result = {}
- while result.get('class', None) != 'TPV':
- json_result = sock_read_line(my_sock, timeout=60)
- sys.stderr.write(
- "Received JSON response: {}\n\n".format(json_result)
- )
- result = json.loads(json_result)
- my_sock.sendall(b'?WATCH={"enable":false}')
- my_sock.close()
+ # Now the chip is on, read back the TPV result
+ result = bist.get_gpsd_tpv_result()
# If we reach this line, we have a valid result and the chip responded.
# However, it doesn't necessarily mean we had a GPS lock.
return True, result
@@ -589,60 +248,9 @@ class E320BIST(object):
return True, {
'tpm0_caps': "Fake caps value\n\nVersion 0.0.0",
}
- result = {}
- props_to_read = ('caps',)
- base_path = '/sys/class/tpm'
- for tpm_device in os.listdir(base_path):
- if tpm_device.startswith('tpm'):
- for key in props_to_read:
- result['{}_{}'.format(tpm_device, key)] = open(
- os.path.join(base_path, tpm_device, key), 'r'
- ).read().strip()
+ result = bist.get_tpm_caps_info()
return len(result) == 1, result
- def ref_clock_helper(self,clock_source):
- """
- Helper function to determine reference clock lock
- Description: Checks to see if we can lock to a clock source.
-
- External Equipment: None
- Return dictionary:
- - <sensor-name>:
- - locked: Boolean lock status
- """
- assert clock_source in ("internal", "external"),\
- "Invalid clock source selected ({}). Valid choices: {}".format(
- clock_source, ("internal", "external"))
- if self.args.dry_run:
- return True, {'ref_locked': True}
- result = {}
- env = os.environ.copy()
- env['UHD_LOG_CONSOLE_LEVEL'] = 'error'
- cmd = ['uhd_usrp_probe', '--args', 'addr=127.0.0.1,clock_source=' + clock_source,
- '--sensor']
- sensor_path = '/mboards/0/sensors/ref_locked'
- cmd.append(sensor_path)
- ref_lock_executor = ' '.join(cmd)
- try:
- output = subprocess.check_output(
- ref_lock_executor,
- stderr=subprocess.STDOUT,
- env=env,
- shell=True,
- )
- except subprocess.CalledProcessError as ex:
- # Don't throw errors from uhd_usrp_probe
- output = ex.output
- output = output.decode("utf-8")
- mobj = re.search(r"true$", output.strip())
- if mobj is not None:
- result['ref_locked'] = True
- else:
- result['ref_locked'] = False
- result['error_msg'] = ("Reference Clock not locked."
- " Extra output:" + output)
- return result
-
def bist_ref_clock_int(self):
"""
BIST for clock lock from internal (20MHz).
@@ -658,7 +266,9 @@ class E320BIST(object):
need to be asserted.
"""
assert 'ref_clock_int' in self.tests_to_run
- result = self.ref_clock_helper('internal')
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop('internal', 'internal')
return 'error_msg' not in result, result
def bist_ref_clock_ext(self):
@@ -676,7 +286,9 @@ class E320BIST(object):
need to be asserted.
"""
assert 'ref_clock_ext' in self.tests_to_run
- result = self.ref_clock_helper('external')
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop('external', 'external')
return 'error_msg' not in result, result
def bist_sfp_loopback(self):
@@ -696,10 +308,14 @@ class E320BIST(object):
- bits: Number of bits that were transferred
"""
if self.args.dry_run:
- return True, get_sfp_bist_defaults()
- sfp_bist_results = run_aurora_bist(master='misc-auro-regs')
+ return True, bist.get_sfp_bist_defaults()
+ sfp_bist_results = bist.run_aurora_bist(
+ device_args=self.device_args,
+ product_id=self.get_product_id(),
+ master='misc-auro-regs',
+ )
self.reload_fpga_image = True
- return aurora_results_to_status(sfp_bist_results)
+ return bist.aurora_results_to_status(sfp_bist_results)
def bist_gpio(self):
"""
@@ -737,7 +353,7 @@ class E320BIST(object):
" Run a GPIO test for a given set of patterns "
gpio_ctrl = e320_periphs.FrontpanelGPIO(ddr)
for pattern in patterns:
- gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr)
+ bist.gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr)
time.sleep(0.1)
gpio_rb = gpio_ctrl.get_all()
if pattern != gpio_rb:
@@ -763,16 +379,8 @@ class E320BIST(object):
assert 'temp' in self.tests_to_run
if self.args.dry_run:
return True, {'internal': 30000}
- import pyudev
- context = pyudev.Context()
- result = {
- temp_sensor_map[device.sys_name]: \
- int(device.attributes.get('temp').decode('ascii'))
- for device in context.list_devices(subsystem='thermal')
-
- if 'temp' in device.attributes.available_attributes \
- and device.attributes.get('temp') is not None
- }
+ result = bist.get_temp_sensor_value(
+ lambda device: TEMP_SENSOR_MAP[device.sys_name])
if len(result) < 1:
result['error_msg'] = "No temperature sensors found!"
return 'error_msg' not in result, result
@@ -790,14 +398,7 @@ class E320BIST(object):
assert 'fan' in self.tests_to_run
if self.args.dry_run:
return True, {'cooling_device0': 10000}
- import pyudev
- context = pyudev.Context()
- result = {
- device.sys_name: int(device.attributes.get('cur_state'))
- for device in context.list_devices(subsystem='thermal')
- if 'cur_state' in device.attributes.available_attributes \
- and device.attributes.get('cur_state') is not None
- }
+ result = bist.get_fan_values()
return len(result) == 1, result
def bist_link_up(self):
@@ -812,65 +413,9 @@ class E320BIST(object):
assert 'link_up' in self.tests_to_run
if self.args.dry_run:
return True, {'sfp0': 'UP'}
- from pyroute2 import IPRoute
- result = {}
- with IPRoute() as ipr:
- links = ipr.link_lookup(ifname='sfp0')
- if not links:
- return False, {'error_msg': "No interface found"}
- link_info = next(iter(ipr.get_links(links)), None)
- if link_info == None:
- return False, {'error_msg': "Error on get_links for sfp0"}
- result['sfp0'] = link_info.get_attr('IFLA_OPERSTATE')
- if result['sfp0'] != 'UP':
- result['error_msg'] = "Link not up for interface"
+ result = bist.get_link_up('sfp0')
return 'error_msg' not in result, result
-
-def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask):
- """Helper function for set gpio.
- What this function do is take decimal value and convert to a binary string
- then try to set those individual bits to the gpio_bank.
- Arguments:
- gpio_bank -- gpio bank type.
- value -- value to set onto gpio bank.
- gpio_size -- size of the gpio bank
- ddr_mask -- data direction register bit mask. 0 is input; 1 is output.
- """
- ddr_size = bin(ddr_mask).count("1")
- value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):]
- ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):]
- for i in range(gpio_size):
- if ddr_bitstring[gpio_size - 1 - i] == "1":
- gpio_bank.set(i, value_bitstring[i % ddr_size])
-
-def get_product_id():
- """Return the mboard product ID (e320):"""
- cmd = ['eeprom-id']
- output = subprocess.check_output(
- cmd,
- stderr=subprocess.STDOUT,
- shell=True,
- ).decode('utf-8')
- if 'e320' in output:
- return 'e320'
- raise AssertionError("Cannot determine product ID.")
-
-def load_fpga_image(fpga_type):
- """Load an FPGA image (1G, XG, AA, ...)"""
- cmd = ['uhd_image_loader', '--args', 'type=e3xx,addr=127.0.0.1', '--fpga-path']
- images_folder = '/usr/share/uhd/images/'
- fpga_file_name = \
- 'usrp_' + get_product_id() + '_fpga_' + fpga_type.upper() + '.bit'
- fpga_image = images_folder + fpga_file_name
- cmd.append(fpga_image)
- cmd_str = ' '.join(cmd)
- subprocess.check_output(
- cmd_str,
- stderr=subprocess.STDOUT,
- shell=True
- )
-
##############################################################################
# main
##############################################################################
diff --git a/mpm/python/n3xx_bist b/mpm/python/n3xx_bist
index 18f1c3b4b..cc7dd40c6 100755
--- a/mpm/python/n3xx_bist
+++ b/mpm/python/n3xx_bist
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright 2017 Ettus Research, National Instruments Company
+# Copyright 2017-2018 Ettus Research, National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
@@ -11,17 +11,9 @@ Will work on all derivatives of the N3xx series.
"""
from __future__ import print_function
-import os
import sys
-import subprocess
-import re
-import socket
-import select
import time
-import json
-from datetime import datetime
-import argparse
-from six import iteritems
+from usrp_mpm import bist
# Timeout values are in seconds:
GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute"
@@ -31,105 +23,20 @@ GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test
# by default.
##############################################################################
-# Aurora/SFP BIST code
+# Bist class
##############################################################################
-def get_sfp_bist_defaults():
- " Default dictionary for SFP/Aurora BIST dry-runs "
- return {
- 'elapsed_time': 1.0,
- 'max_roundtrip_latency': 0.8e-6,
- 'throughput': 1000e6,
- 'max_ber': 8.5e-11,
- 'errors': 0,
- 'bits': 12012486656,
- }
-
-def assert_aurora_image(master, slave):
- """
- Make sure we have an FPGA image with which we can run the requested tests.
-
- Will load an AA image if not, which always satisfies all conditions for
- running Aurora tests.
- """
- from usrp_mpm.sys_utils import uio
- if not uio.find_uio_device(master)[0] or \
- (slave is not None and not uio.find_uio_device(slave)[0]):
- load_fpga_image('AA')
-
-def run_aurora_bist(master, slave=None):
+class N3XXBIST(bist.UsrpBIST):
"""
- Spawn a BER test
- """
- from usrp_mpm import aurora_control
- from usrp_mpm.sys_utils.uio import open_uio
-
- class DummyContext(object):
- """Dummy class for context managers"""
- def __enter__(self):
- return
-
- def __exit__(self, exc_type, exc_value, traceback):
- return exc_type is None
-
- # Go, go, go!
- try:
- assert_aurora_image(master, slave)
- with open_uio(label=master, read_only=False) as master_au_uio:
- master_au_ctrl = aurora_control.AuroraControl(master_au_uio)
- with open_uio(label=slave, read_only=False)\
- if slave is not None else DummyContext() as slave_au_uio:
- slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\
- if slave is not None else None
- return master_au_ctrl.run_ber_loopback_bist(
- duration=10,
- requested_rate=1300 * 8e6,
- slave=slave_au_ctrl,
- )
- except Exception as ex:
- print("Unexpected exception: {}".format(str(ex)))
- exit(1)
-
-
-def aurora_results_to_status(bist_results):
- """
- Convert a dictionary coming from AuroraControl BIST to one that we can use
- for this BIST
+ BIST Tool for the USRP N3xx series
"""
- return bist_results['mst_errors'] == 0, {
- 'elapsed_time': bist_results['time_elapsed'],
- 'max_roundtrip_latency': bist_results['mst_latency_us'],
- 'throughput': bist_results['approx_throughput'],
- 'max_ber': bist_results['max_ber'],
- 'errors': bist_results['mst_errors'],
- 'bits': bist_results['mst_samps'],
+ usrp_type = "N3XX"
+ # This defines special tests that are really collections of other tests.
+ collections = {
+ 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm"],
+ 'extended': "*",
}
-
-##############################################################################
-# Helpers
-##############################################################################
-def post_results(results):
- """
- Given a dictionary, post the results.
-
- This will print the results as JSON to stdout.
- """
- print(json.dumps(
- results,
- sort_keys=True,
- indent=4,
- separators=(',', ': ')
- ))
-
-def filter_results_for_lv(results):
- """
- The LabView JSON parser does not support a variety of things, such as
- nested dicts, and some downstream LV applications freak out if certain keys
- are not what they expect.
- This is a long hard-coded list of how results should look like for those
- cases. Note: This list needs manual supervision and attention for the case
- where either subsystems get renamed, or other architectural changes should
- occur.
- """
+ # Default FPGA image type
+ DEFAULT_FPGA_TYPE = 'HG'
lv_compat_format = {
'ddr3': {
'throughput': -1,
@@ -184,240 +91,24 @@ def filter_results_for_lv(results):
'lock_status': 0,
},
}
- # OK now go and brush up the results:
- def fixup_dict(result_dict, ref_dict):
- """
- Touches up result_dict according to ref_dict by the following rules:
- - If a key is in result_dict that is not in ref_dict, delete that
- - If a key is in ref_dict that is not in result_dict, use the value
- from ref_dict
- """
- ref_dict['error_msg'] = ""
- ref_dict['status'] = False
- result_dict = {
- k: v for k, v in iteritems(result_dict)
- if k in ref_dict or k in ('error_msg', 'status')
- }
- result_dict = {
- k: result_dict.get(k, ref_dict[k]) for k in ref_dict
- }
- return result_dict
- results = {
- testname: fixup_dict(testresults, lv_compat_format[testname]) \
- if testname in lv_compat_format else testresults
- for testname, testresults in iteritems(results)
- }
- return results
-
-def sock_read_line(my_sock, timeout=60, interval=0.1):
- """
- Read from a socket until newline. If there was no newline until the timeout
- occurs, raise an error. Otherwise, return the line.
- """
- line = b''
- end_time = time.time() + timeout
- while time.time() < end_time:
- socket_ready = select.select([my_sock], [], [], 0)[0]
- if socket_ready:
- next_char = my_sock.recv(1)
- if next_char == b'\n':
- return line.decode('ascii')
- line += next_char
- else:
- time.sleep(interval)
- raise RuntimeError("sock_read_line() exceeded read timeout!")
-
-def poll_with_timeout(state_check, timeout_ms, interval_ms):
- """
- Calls state_check() every interval_ms until it returns a positive value, or
- until a timeout is exceeded.
-
- Returns True if state_check() returned True within the timeout.
- """
- max_time = time.time() + (float(timeout_ms) / 1000)
- interval_s = float(interval_ms) / 1000
- while time.time() < max_time:
- if state_check():
- return True
- time.sleep(interval_s)
- return False
-
-def expand_options(option_list):
- """
- Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar',
- 'spam': 'eggs'}.
- """
- return dict(x.split('=') for x in option_list)
-
-##############################################################################
-# Bist class
-##############################################################################
-class N3XXBIST(object):
- """
- BIST Tool for the USRP N3xx series
- """
- # This defines special tests that are really collections of other tests.
- collections = {
- 'standard': ["ddr3", "gpsdo", "rtc", "temp", "fan", "tpm"],
- 'extended': "*",
- }
- # Default FPGA image type
- DEFAULT_FPGA_TYPE = 'HG'
-
- @staticmethod
- def make_arg_parser():
- """
- Return arg parser
- """
- parser = argparse.ArgumentParser(
- description="N3xx BIST Tool",
- )
- parser.add_argument(
- '-n', '--dry-run', action='store_true',
- help="Fake out the tests. All tests will return a valid" \
- " response, but will not actually interact with hardware.",
- )
- parser.add_argument(
- '-v', '--verbose', action='store_true',
- help="Crank up verbosity level",
- )
- parser.add_argument(
- '--debug', action='store_true',
- help="For debugging this tool.",
- )
- parser.add_argument(
- '--option', '-o', action='append', default=[],
- help="Option for individual test.",
- )
- parser.add_argument(
- '--lv-compat', action='store_true',
- help="Provides compatibility with the LV JSON parser. Don't run "
- "this mode unless you know what you're doing. The JSON "
- "output does not necessarily reflect the actual system "
- "status when using this mode.",
- )
- parser.add_argument(
- '--skip-fpga-reload', action='store_true',
- help="Skip reloading the default FPGA image post-test. Note: by"
- "specifying this argument, the FPGA image loaded could be "
- "anything post-test.",
- )
- parser.add_argument(
- 'tests',
- help="List the tests that should be run",
- nargs='+', # There has to be at least one
- )
- return parser
+ device_args = "type=n3xx,addr=127.0.0.1"
def __init__(self):
- self.args = N3XXBIST.make_arg_parser().parse_args()
- self.args.option = expand_options(self.args.option)
- # If this is true, trigger a reload of the default FPGA image
- self.reload_fpga_image = False
- try:
- from usrp_mpm.periph_manager.n3xx import n3xx
- default_rev = n3xx.mboard_max_rev
- except ImportError:
- # This means we're in dry run mode or something like that, so just
- # pick something
- default_rev = 3
- self.mb_rev = int(self.args.option.get('mb_rev', default_rev))
- self.tests_to_run = set()
- for test in self.args.tests:
- if test in self.collections:
- for test in self.expand_collection(test):
- self.tests_to_run.add(test)
- else:
- self.tests_to_run.add(test)
- try:
- # Keep this import here so we can do dry-runs without any MPM code
- from usrp_mpm import get_main_logger
- if not self.args.verbose:
- from usrp_mpm.mpmlog import WARNING
- get_main_logger().setLevel(WARNING)
- self.log = get_main_logger().getChild('main')
- except ImportError:
- print("No logging capability available.")
-
- def expand_collection(self, coll):
- """
- Return names of tests in a collection
- """
- tests = self.collections[coll]
- if tests == "*":
- tests = {x.replace('bist_', '')
- for x in dir(self)
- if x.find('bist_') == 0
- }
- else:
- tests = set(tests)
- return tests
-
- def run(self):
- """
- Execute tests.
+ bist.UsrpBIST.__init__(self)
- Returns True on Success.
- """
- def execute_test(testname):
- """
- Actually run a test.
- """
- testmethod_name = "bist_{0}".format(testname)
- sys.stderr.write(
- "Executing test method: {0}\n\n".format(testmethod_name)
- )
- try:
- status, data = getattr(self, testmethod_name)()
- data['status'] = status
- data['error_msg'] = data.get('error_msg', '')
- return status, data
- except AttributeError:
- sys.stderr.write("Test not defined: {}\n".format(testname))
- return False, {}
- except Exception as ex:
- sys.stderr.write(
- "Test {} failed to execute: {}\n".format(testname, str(ex))
- )
- if self.args.debug:
- raise
- return False, {'error_msg': str(ex)}
- tests_successful = True
- result = {}
- for test in self.tests_to_run:
- status, result_data = execute_test(test)
- tests_successful = tests_successful and status
- result[test] = result_data
- if self.args.lv_compat:
- result = filter_results_for_lv(result)
- post_results(result)
- if self.reload_fpga_image and not self.args.skip_fpga_reload:
- load_fpga_image(self.DEFAULT_FPGA_TYPE)
- return tests_successful
+ def get_mb_periph_mgr(self):
+ """Return reference to an n3xx periph manager"""
+ from usrp_mpm.periph_manager.n3xx import n3xx
+ return n3xx
+
+ def get_product_id(self):
+ """Return the mboard product ID (n310 or n300):"""
+ return bist.get_product_id_from_eeprom(valid_ids=['n300', 'n310'])
#############################################################################
# BISTS
# All bist_* methods must return True/False success values!
#############################################################################
- def bist_rtc(self):
- """
- BIST for RTC (real time clock)
-
- Return dictionary:
- - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601
- format, as a string. As if running 'date -Iseconds -u'.
- - time: Same time, but in seconds since epoch.
-
- Return status:
- Unless datetime throws an exception, returns True.
- """
- assert 'rtc' in self.tests_to_run
- utc_now = datetime.utcnow()
- return True, {
- 'time': time.mktime(utc_now.timetuple()),
- 'date': utc_now.replace(microsecond=0).isoformat() + "+00:00",
- }
-
def bist_ddr3(self):
"""
BIST for PL DDR3 DRAM
@@ -437,25 +128,7 @@ class N3XXBIST(object):
assert 'ddr3' in self.tests_to_run
if self.args.dry_run:
return True, {'throughput': 1250e6}
- result = {}
- ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1'
- try:
- output = subprocess.check_output(
- ddr3_bist_executor,
- stderr=subprocess.STDOUT,
- shell=True,
- )
- except subprocess.CalledProcessError as ex:
- # Don't throw errors from uhd_usrp_probe
- output = ex.output
- output = output.decode("utf-8")
- mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output)
- if mobj is not None:
- result['throughput'] = float(mobj.group('thrup')) * 1000
- else:
- result['throughput'] = 0
- result['error_msg'] = result.get('error_msg', '') + \
- "\n\nFailed match throughput regex!"
+ result = bist.test_ddr3_with_usrp_probe()
return result.get('throughput', 0) > 1000e3, result
def bist_gpsdo(self):
@@ -501,7 +174,7 @@ class N3XXBIST(object):
sys.stderr.write(
"Waiting for WARMUP to go low for up to {} seconds...\n".format(
gps_warmup_timeout))
- if not poll_with_timeout(
+ if not bist.poll_with_timeout(
lambda: not gpio_tca6424.get('GPS-WARMUP'),
gps_warmup_timeout*1000, 1000
):
@@ -513,7 +186,7 @@ class N3XXBIST(object):
sys.stderr.write(
"Waiting for LOCKOK to go high for up to {} seconds...\n".format(
gps_lockok_timeout))
- if not poll_with_timeout(
+ if not bist.poll_with_timeout(
lambda: gpio_tca6424.get('GPS-LOCKOK'),
gps_lockok_timeout*1000,
1000
@@ -528,24 +201,8 @@ class N3XXBIST(object):
sys.stderr.write("GPS-ALARM status: {}\n".format(
gpio_tca6424.get('GPS-ALARM')
))
- # Now read back response from chip
- my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- my_sock.connect(('localhost', 2947))
- sys.stderr.write("Connected to GPSDO socket.\n")
- query_cmd = b'?WATCH={"enable":true,"json":true}'
- my_sock.sendall(query_cmd)
- sys.stderr.write("Sent query: {}\n".format(query_cmd))
- sock_read_line(my_sock, timeout=10)
- sys.stderr.write("Received initial newline.\n")
- result = {}
- while result.get('class', None) != 'TPV':
- json_result = sock_read_line(my_sock, timeout=60)
- sys.stderr.write(
- "Received JSON response: {}\n\n".format(json_result)
- )
- result = json.loads(json_result)
- my_sock.sendall(b'?WATCH={"enable":false}')
- my_sock.close()
+ # Now the chip is on, read back the TPV result
+ result = bist.get_gpsd_tpv_result()
# If we reach this line, we have a valid result and the chip responded.
# However, it doesn't necessarily mean we had a GPS lock.
return True, result
@@ -567,17 +224,87 @@ class N3XXBIST(object):
return True, {
'tpm0_caps': "Fake caps value\n\nVersion 0.0.0",
}
- result = {}
- props_to_read = ('caps',)
- base_path = '/sys/class/tpm'
- for tpm_device in os.listdir(base_path):
- if tpm_device.startswith('tpm'):
- for key in props_to_read:
- result['{}_{}'.format(tpm_device, key)] = open(
- os.path.join(base_path, tpm_device, key), 'r'
- ).read().strip()
+ result = bist.get_tpm_caps_info()
return len(result) == 1, result
+ def bist_ref_clock_int(self):
+ """
+ BIST for clock lock from internal (25 MHz) source.
+ Description: Checks to see if the daughtercard can lock to an internal
+ clock source.
+
+ External Equipment: None
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+
+ There can be multiple ref lock sensors; for a pass condition they all
+ need to be asserted.
+ """
+ assert 'ref_clock_int' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop(
+ 'internal',
+ 'internal',
+ extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0}
+ )
+ return 'error_msg' not in result, result
+
+ def bist_ref_clock_ext(self):
+ """
+ BIST for clock lock from external source. Note: This test requires a
+ connected daughterboard with a 'ref lock' sensor available.
+
+ Description: Checks to see if the daughtercard can lock to the external
+ reference clock.
+
+ External Equipment: 10 MHz reference Source connected to "ref in".
+
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+
+ There can be multiple ref lock sensors; for a pass condition they all
+ need to be asserted.
+ """
+ assert 'ref_clock_ext' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop(
+ 'external',
+ 'external',
+ extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0}
+ )
+ return 'error_msg' not in result, result
+
+ def bist_ref_clock_gpsdo(self):
+ """
+ BIST for clock lock from external source. Note: This test requires a
+ connected daughterboard with a 'ref lock' sensor available.
+
+ Description: Checks to see if the daughtercard can lock to the external
+ reference clock.
+
+ External Equipment: 10 MHz reference Source connected to "ref in".
+
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+
+ There can be multiple ref lock sensors; for a pass condition they all
+ need to be asserted.
+ """
+ assert 'ref_clock_gpsdo' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop(
+ 'gpsdo',
+ 'gpsdo',
+ extra_args={'skip_rfic': 1, 'rfnoc_num_blocks': 0}
+ )
+ return 'error_msg' not in result, result
+
def bist_sfp0_loopback(self):
"""
BIST for SFP+ ports:
@@ -596,10 +323,14 @@ class N3XXBIST(object):
- bits: Number of bits that were transferred
"""
if self.args.dry_run:
- return True, get_sfp_bist_defaults()
- sfp_bist_results = run_aurora_bist(master='misc-auro-regs0')
+ return True, bist.get_sfp_bist_defaults()
+ sfp_bist_results = bist.run_aurora_bist(
+ device_args=self.device_args,
+ product_id=self.get_product_id(),
+ master='misc-auro-regs0',
+ )
self.reload_fpga_image = True
- return aurora_results_to_status(sfp_bist_results)
+ return bist.aurora_results_to_status(sfp_bist_results)
def bist_sfp1_loopback(self):
"""
@@ -619,10 +350,14 @@ class N3XXBIST(object):
- bits: Number of bits that were transferred
"""
if self.args.dry_run:
- return True, get_sfp_bist_defaults()
- sfp_bist_results = run_aurora_bist(master='misc-auro-regs1')
+ return True, bist.get_sfp_bist_defaults()
+ sfp_bist_results = bist.run_aurora_bist(
+ device_args=self.device_args,
+ product_id=self.get_product_id(),
+ master='misc-auro-regs1',
+ )
self.reload_fpga_image = True
- return aurora_results_to_status(sfp_bist_results)
+ return bist.aurora_results_to_status(sfp_bist_results)
def bist_sfp_loopback(self):
"""
@@ -642,13 +377,15 @@ class N3XXBIST(object):
- bits: Number of bits that were transferred
"""
if self.args.dry_run:
- return True, get_sfp_bist_defaults()
- sfp_bist_results = run_aurora_bist(
+ return True, bist.get_sfp_bist_defaults()
+ sfp_bist_results = bist.run_aurora_bist(
+ device_args=self.device_args,
+ product_id=self.get_product_id(),
master='misc-auro-regs0',
slave='misc-auro-regs1',
)
self.reload_fpga_image = True
- return aurora_results_to_status(sfp_bist_results)
+ return bist.aurora_results_to_status(sfp_bist_results)
def bist_gpio(self):
"""
@@ -690,7 +427,7 @@ class N3XXBIST(object):
" Run a GPIO test for a given set of patterns "
gpio_ctrl = n3xx_periphs.FrontpanelGPIO(ddr)
for pattern in patterns:
- gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr)
+ bist.gpio_set_all(gpio_ctrl, pattern, GPIO_WIDTH, ddr)
time.sleep(0.1)
gpio_rb = gpio_ctrl.get_all()
if pattern != gpio_rb:
@@ -716,15 +453,8 @@ class N3XXBIST(object):
assert 'temp' in self.tests_to_run
if self.args.dry_run:
return True, {'fpga-thermal-zone': 30000}
- import pyudev
- context = pyudev.Context()
- result = {
- device.attributes.get('type').decode('ascii'): \
- int(device.attributes.get('temp').decode('ascii'))
- for device in context.list_devices(subsystem='thermal')
- if 'temp' in device.attributes.available_attributes \
- and device.attributes.get('temp') is not None
- }
+ result = bist.get_temp_sensor_value(
+ lambda device: device.attributes.get('type').decode('ascii'))
if len(result) < 1:
result['error_msg'] = "No temperature sensors found!"
return 'error_msg' not in result, result
@@ -742,14 +472,7 @@ class N3XXBIST(object):
assert 'fan' in self.tests_to_run
if self.args.dry_run:
return True, {'cooling_device0': 10000, 'cooling_device1': 10000}
- import pyudev
- context = pyudev.Context()
- result = {
- device.sys_name: int(device.attributes.get('cur_state'))
- for device in context.list_devices(subsystem='thermal')
- if 'cur_state' in device.attributes.available_attributes \
- and device.attributes.get('cur_state') is not None
- }
+ result = bist.get_fan_values()
return len(result) == 2, result
def bist_whiterabbit(self):
@@ -768,7 +491,11 @@ class N3XXBIST(object):
from usrp_mpm.sys_utils import uio
if not uio.find_uio_device(n3xx.wr_regs_label, logger=self.log)[0]:
self.log.info("Need to load WX image before proceeding...")
- load_fpga_image('WX')
+ bist.load_fpga_image(
+ 'WX',
+ self.device_args,
+ self.get_product_id(),
+ )
self.log.info("Image loading complete.")
self.reload_fpga_image = True
mb_regs = n3xx_periphs.MboardRegsControl(
@@ -776,7 +503,7 @@ class N3XXBIST(object):
mb_regs.set_time_source('sfp0', 25e6)
wr_regs_control = WhiteRabbitRegsControl(
n3xx.wr_regs_label, self.log)
- lock_status = poll_with_timeout(
+ lock_status = bist.poll_with_timeout(
lambda: wr_regs_control.get_time_lock_status(),
40000, # Try for x ms... this number is set from a few benchtop tests
1000, # Poll every... second! why not?
@@ -786,160 +513,6 @@ class N3XXBIST(object):
}
return lock_status, result
- def bist_ref_clock_int(self):
- """
- BIST for clock and pps lock from internal (25MHz).
- Description: Checks to see if we can lock to an internal
- clock source.
-
- External Equipment: None
- Return dictionary:
- - <sensor-name>:
- - locked: Boolean lock status
-
- There can be multiple ref lock sensors; for a pass condition they all
- need to be asserted.
- """
- assert 'ref_clock_int' in self.tests_to_run
- result = self.ref_clock_helper('internal', 'internal')
- return 'error_msg' not in result, result
-
- def bist_ref_clock_ext(self):
- """
- BIST for clock pps lock from external (10MHz) source.
- Description: Checks to see if we can lock to an external
- clock source.
-
- External Equipment: External 10 MHz reference clock needed (from Octoclock)
- Return dictionary:
- - <sensor-name>:
- - locked: Boolean lock status
-
- There can be multiple ref lock sensors; for a pass condition they all
- need to be asserted.
- """
- assert 'ref_clock_ext' in self.tests_to_run
- result = self.ref_clock_helper('external', 'external')
- return 'error_msg' not in result, result
-
- def bist_ref_clock_gpsdo(self):
- """
- BIST for clock and pps lock from gpsdo (20MHz) source.
- Description: Checks to see if we can lock to an external
- clock source.
-
- External Equipment: None
- Return dictionary:
- - <sensor-name>:
- - locked: Boolean lock status
-
- There can be multiple ref lock sensors; for a pass condition they all
- need to be asserted.
- """
- assert 'ref_clock_gpsdo' in self.tests_to_run
- result = self.ref_clock_helper('gpsdo', 'gpsdo')
- return 'error_msg' not in result, result
-
- def ref_clock_helper(self, clock_source, time_source):
- """
- Helper function to determine reference clock lock
- Description: Checks to see if we can lock to a clock source.
- External Equipment: None
- Return dictionary:
- - <sensor-name>:
- - locked: Boolean lock status
- """
- assert clock_source in ("gpsdo", "internal", "external"),\
- "Invalid clock source selected ({}). Valid choices: {}".format(
- clock_source, ("gpsdo", "internal", "external"))
-
- assert time_source in ("gpsdo", "internal", "external", "sfp0"),\
- "Invalid time source selected ({}). Valid choices: {}".format(
- time_source, ("gpsdo", "internal", "external", "sfp0"))
-
- if self.args.dry_run:
- return True, {'ref_locked': True}
- result = {}
- env = os.environ.copy()
- env['UHD_LOG_CONSOLE_LEVEL'] = 'error'
- cmd = ['uhd_usrp_probe', '--args',
- 'addr=127.0.0.1,'
- 'rfnoc_num_blocks=0,'
- 'skip_rfic=1,'
- 'clock_source={c},time_source={t}'.format(c=clock_source,
- t=time_source),
- '--sensor']
- sensor_path = '/mboards/0/sensors/ref_locked'
- cmd.append(sensor_path)
- ref_lock_executor = ' '.join(cmd)
- try:
- output = subprocess.check_output(
- ref_lock_executor,
- stderr=subprocess.STDOUT,
- env=env,
- shell=True,
- )
- except subprocess.CalledProcessError as ex:
- # Don't throw errors from uhd_usrp_probe
- output = ex.output
- output = output.decode("utf-8")
- mobj = re.search(r"true$", output.strip())
- if mobj is not None:
- result['ref_locked'] = True
- else:
- result['ref_locked'] = False
- result['error_msg'] = ("Reference Clock not locked."
- " Extra output:" + output)
- return result
-
-
-
-def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask):
- """Helper function for set gpio.
- What this function do is take decimal value and convert to a binary string
- then try to set those individual bits to the gpio_bank.
- Arguments:
- gpio_bank -- gpio bank type.
- value -- value to set onto gpio bank.
- gpio_size -- size of the gpio bank
- ddr_mask -- data direction register bit mask. 0 is input; 1 is output.
- """
- ddr_size = bin(ddr_mask).count("1")
- value_bitstring = ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):]
- ddr_bitstring = ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):]
- for i in range(gpio_size):
- if ddr_bitstring[gpio_size - 1 - i] == "1":
- gpio_bank.set(i, value_bitstring[i % ddr_size])
-
-def get_product_id():
- """Return the mboard product ID (n310 or n300):"""
- cmd = ['eeprom-id']
- output = subprocess.check_output(
- cmd,
- stderr=subprocess.STDOUT,
- shell=True,
- ).decode('utf-8')
- if 'n310' in output:
- return 'n310'
- elif 'n300' in output:
- return 'n300'
- raise AssertionError("Cannot determine product ID.")
-
-def load_fpga_image(fpga_type):
- """Load an FPGA image (HG, XG, AA, ...)"""
- cmd = ['uhd_image_loader', '--args', 'type=n3xx', '--fpga-path']
- images_folder = '/usr/share/uhd/images/'
- fpga_file_name = \
- 'usrp_' + get_product_id() + '_fpga_' + fpga_type.upper() + '.bit'
- fpga_image = images_folder + fpga_file_name
- cmd.append(fpga_image)
- cmd_str = ' '.join(cmd)
- subprocess.check_output(
- cmd_str,
- stderr=subprocess.STDOUT,
- shell=True
- )
-
##############################################################################
# main
##############################################################################
diff --git a/mpm/python/usrp_mpm/bist.py b/mpm/python/usrp_mpm/bist.py
new file mode 100644
index 000000000..b16fefa35
--- /dev/null
+++ b/mpm/python/usrp_mpm/bist.py
@@ -0,0 +1,598 @@
+#
+# Copyright 2018 Ettus Research, a National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+Utilities to write BIST executables for USRPs
+"""
+
+import os
+import re
+import sys
+import time
+import json
+import select
+import socket
+from datetime import datetime
+import argparse
+import subprocess
+from six import iteritems
+
+##############################################################################
+# Aurora/SFP BIST code
+##############################################################################
+def get_sfp_bist_defaults():
+ " Default dictionary for SFP/Aurora BIST dry-runs "
+ return {
+ 'elapsed_time': 1.0,
+ 'max_roundtrip_latency': 0.8e-6,
+ 'throughput': 1000e6,
+ 'max_ber': 8.5e-11,
+ 'errors': 0,
+ 'bits': 12012486656,
+ }
+
+def assert_aurora_image(master, slave, device_args, product_id, aurora_image_type='AA'):
+ """
+ Make sure we have an FPGA image with which we can run the requested tests.
+
+ Will load an AA image if not, which always satisfies all conditions for
+ running Aurora tests.
+ """
+ from usrp_mpm.sys_utils import uio
+ if not uio.find_uio_device(master)[0] or \
+ (slave is not None and not uio.find_uio_device(slave)[0]):
+ load_fpga_image(
+ fpga_type=aurora_image_type,
+ device_args=device_args,
+ product_id=product_id,
+ )
+
+def aurora_results_to_status(bist_results):
+ """
+ Convert a dictionary coming from AuroraControl BIST to one that we can use
+ for this BIST
+ """
+ return bist_results['mst_errors'] == 0, {
+ 'elapsed_time': bist_results['time_elapsed'],
+ 'max_roundtrip_latency': bist_results['mst_latency_us'],
+ 'throughput': bist_results['approx_throughput'],
+ 'max_ber': bist_results['max_ber'],
+ 'errors': bist_results['mst_errors'],
+ 'bits': bist_results['mst_samps'],
+ }
+
+def run_aurora_bist(
+ device_args,
+ product_id,
+ master,
+ slave=None,
+ requested_rate=1300*8e6,
+ aurora_image_type='AA'):
+ """
+ Spawn a BER test
+ """
+ from usrp_mpm import aurora_control
+ from usrp_mpm.sys_utils.uio import open_uio
+
+ class DummyContext(object):
+ """Dummy class for context managers"""
+ def __enter__(self):
+ return
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ return exc_type is None
+
+ # Go, go, go!
+ try:
+ assert_aurora_image(master, slave, device_args, product_id, aurora_image_type)
+ with open_uio(label=master, read_only=False) as master_au_uio:
+ master_au_ctrl = aurora_control.AuroraControl(master_au_uio)
+ with open_uio(label=slave, read_only=False)\
+ if slave is not None else DummyContext() as slave_au_uio:
+ slave_au_ctrl = aurora_control.AuroraControl(slave_au_uio)\
+ if slave is not None else None
+ return master_au_ctrl.run_ber_loopback_bist(
+ duration=10,
+ requested_rate=requested_rate,
+ slave=slave_au_ctrl,
+ )
+ except Exception as ex:
+ print("Unexpected exception: {}".format(str(ex)))
+ exit(1)
+
+
+##############################################################################
+# Helpers
+##############################################################################
+def post_results(results):
+ """
+ Given a dictionary, post the results.
+
+ This will print the results as JSON to stdout.
+ """
+ print(json.dumps(
+ results,
+ sort_keys=True,
+ indent=4,
+ separators=(',', ': ')
+ ))
+
+def sock_read_line(my_sock, timeout=60, interval=0.1):
+ """
+ Read from a socket until newline. If there was no newline until the timeout
+ occurs, raise an error. Otherwise, return the line.
+ """
+ line = b''
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ socket_ready = select.select([my_sock], [], [], 0)[0]
+ if socket_ready:
+ next_char = my_sock.recv(1)
+ if next_char == b'\n':
+ return line.decode('ascii')
+ line += next_char
+ else:
+ time.sleep(interval)
+ raise RuntimeError("sock_read_line() exceeded read timeout!")
+
+def poll_with_timeout(state_check, timeout_ms, interval_ms):
+ """
+ Calls state_check() every interval_ms until it returns a positive value, or
+ until a timeout is exceeded.
+
+ Returns True if state_check() returned True within the timeout.
+ """
+ max_time = time.time() + (float(timeout_ms) / 1000)
+ interval_s = float(interval_ms) / 1000
+ while time.time() < max_time:
+ if state_check():
+ return True
+ time.sleep(interval_s)
+ return False
+
+def expand_options(option_list):
+ """
+ Turn a list ['foo=bar', 'spam=eggs'] into a dictionary {'foo': 'bar',
+ 'spam': 'eggs'}.
+ """
+ return dict(x.split('=') for x in option_list)
+
+def load_fpga_image(
+ fpga_type,
+ device_args,
+ product_id,
+ images_folder='/usr/share/uhd/images/'):
+ """Load an FPGA image (1G, XG, AA, ...)"""
+ # cmd = ['uhd_image_loader', '--args', 'type=e3xx,addr=127.0.0.1', '--fpga-path']
+ fpga_file_name = \
+ 'usrp_' + product_id + '_fpga_' + fpga_type.upper() + '.bit'
+ fpga_image = images_folder + fpga_file_name
+ cmd = [
+ 'uhd_image_loader',
+ '--args', device_args,
+ '--fpga-path', fpga_image
+ ]
+ cmd_str = ' '.join(cmd)
+ subprocess.check_output(
+ cmd_str,
+ stderr=subprocess.STDOUT,
+ shell=True
+ )
+
+def filter_results_for_lv(results, lv_compat_format):
+ """
+ The LabView JSON parser does not support a variety of things, such as
+ nested dicts, and some downstream LV applications freak out if certain keys
+ are not what they expect.
+ This is a long hard-coded list of how results should look like for those
+ cases. Note: This list needs manual supervision and attention for the case
+ where either subsystems get renamed, or other architectural changes should
+ occur.
+ """
+ def fixup_dict(result_dict, ref_dict):
+ """
+ Touches up result_dict according to ref_dict by the following rules:
+ - If a key is in result_dict that is not in ref_dict, delete that
+ - If a key is in ref_dict that is not in result_dict, use the value
+ from ref_dict
+ """
+ ref_dict['error_msg'] = ""
+ ref_dict['status'] = False
+ result_dict = {
+ k: v for k, v in iteritems(result_dict)
+ if k in ref_dict or k in ('error_msg', 'status')
+ }
+ result_dict = {
+ k: result_dict.get(k, ref_dict[k]) for k in ref_dict
+ }
+ return result_dict
+ # GoGoGo
+ results = {
+ testname: fixup_dict(testresults, lv_compat_format[testname]) \
+ if testname in lv_compat_format else testresults
+ for testname, testresults in iteritems(results)
+ }
+ return results
+
+def get_product_id_from_eeprom(valid_ids):
+ """Return the mboard product ID
+
+ Returns something like n300, n310, e320...
+ """
+ cmd = ['eeprom-id']
+ output = subprocess.check_output(
+ cmd,
+ stderr=subprocess.STDOUT,
+ shell=True,
+ ).decode('utf-8')
+ for valid_id in valid_ids:
+ if valid_id in output:
+ return valid_id
+ raise AssertionError("Cannot determine product ID.: `{}'".format(output))
+
+def get_tpm_caps_info():
+ """Read 'caps' info from TPM subsystem"""
+ result = {}
+ props_to_read = ('caps',)
+ base_path = '/sys/class/tpm'
+ for tpm_device in os.listdir(base_path):
+ if tpm_device.startswith('tpm'):
+ for key in props_to_read:
+ result['{}_{}'.format(tpm_device, key)] = open(
+ os.path.join(base_path, tpm_device, key), 'r'
+ ).read().strip()
+ return result
+
+def gpio_set_all(gpio_bank, value, gpio_size, ddr_mask):
+ """Helper function for set gpio.
+ What this function do is take decimal value and convert to a binary string
+ then try to set those individual bits to the gpio_bank.
+ Arguments:
+ gpio_bank -- gpio bank type.
+ value -- value to set onto gpio bank.
+ gpio_size -- size of the gpio bank
+ ddr_mask -- data direction register bit mask. 0 is input; 1 is output.
+ """
+ ddr_size = bin(ddr_mask).count("1")
+ value_bitstring = \
+ ('{0:0' + str(ddr_size) + 'b}').format(value)[-(gpio_size):]
+ ddr_bitstring = \
+ ('{0:0' + str(gpio_size) + 'b}').format(ddr_mask)[-(gpio_size):]
+ for i in range(gpio_size):
+ if ddr_bitstring[gpio_size - 1 - i] == "1":
+ gpio_bank.set(i, value_bitstring[i % ddr_size])
+
+##############################################################################
+# Common tests
+##############################################################################
+def test_ddr3_with_usrp_probe():
+ """
+ Run uhd_usrp_probe and scrape the output to see if the DRAM FIFO block is
+ reporting a good throughput. This is a bit of a roundabout way of testing
+ the DDR3, but it uses existing software and also tests the RFNoC pathways.
+ """
+ result = {}
+ ddr3_bist_executor = 'uhd_usrp_probe --args addr=127.0.0.1'
+ try:
+ output = subprocess.check_output(
+ ddr3_bist_executor,
+ stderr=subprocess.STDOUT,
+ shell=True,
+ )
+ except subprocess.CalledProcessError as ex:
+ # Don't throw errors from uhd_usrp_probe
+ output = ex.output
+ output = output.decode("utf-8")
+ mobj = re.search(r"Throughput: (?P<thrup>[0-9.]+)\s?MB", output)
+ if mobj is not None:
+ result['throughput'] = float(mobj.group('thrup')) * 1000
+ else:
+ result['throughput'] = 0
+ result['error_msg'] = result.get('error_msg', '') + \
+ "\n\nFailed match throughput regex!"
+ return result
+
+
+def get_gpsd_tpv_result():
+ """
+ Query gpsd via a socket and return the corresponding JSON result as a
+ dictionary.
+ """
+ my_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ my_sock.connect(('localhost', 2947))
+ sys.stderr.write("Connected to GPSDO socket.\n")
+ query_cmd = b'?WATCH={"enable":true,"json":true}'
+ my_sock.sendall(query_cmd)
+ sys.stderr.write("Sent query: {}\n".format(query_cmd))
+ sock_read_line(my_sock, timeout=10)
+ sys.stderr.write("Received initial newline.\n")
+ result = {}
+ while result.get('class', None) != 'TPV':
+ json_result = sock_read_line(my_sock, timeout=60)
+ sys.stderr.write(
+ "Received JSON response: {}\n\n".format(json_result)
+ )
+ result = json.loads(json_result)
+ my_sock.sendall(b'?WATCH={"enable":false}')
+ my_sock.close()
+ return result
+
+def get_ref_clock_prop(clock_source, time_source, extra_args=None):
+ """
+ Helper function to determine reference clock lock
+ Description: Checks to see if we can lock to a clock source.
+ The actual value is yanked from the property tree.
+
+ External Equipment: None
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+ """
+ extra_args = extra_args or {}
+ result = {}
+ extra_args_str = ",".join(
+ ['{k}={v}'.format(k=k, v=v) for k, v in iteritems(extra_args)])
+ cmd = [
+ 'uhd_usrp_probe',
+ '--args',
+ 'addr=127.0.0.1,clock_source={c},time_source={t},{e}'.format(
+ c=clock_source, t=time_source, e=extra_args_str),
+ '--sensor'
+ ]
+ sensor_path = '/mboards/0/sensors/ref_locked'
+ cmd.append(sensor_path)
+ ref_lock_executor = ' '.join(cmd)
+ try:
+ output = subprocess.check_output(
+ ref_lock_executor,
+ stderr=subprocess.PIPE,
+ shell=True,
+ )
+ except subprocess.CalledProcessError as ex:
+ # Don't throw errors from uhd_usrp_probe
+ output = ex.output
+ output = output.decode("utf-8").strip()
+ mobj = re.search(r"true$", output)
+ if mobj is not None:
+ result['ref_locked'] = True
+ else:
+ result['ref_locked'] = False
+ result['error_msg'] = "Reference Clock not locked: " + output
+ return result
+
+def get_temp_sensor_value(temp_sensor_map):
+ """
+ Read a temp sensor value from the system and return a dictionary of the
+ form {temp_sensor_lookup(device): $temp}
+ """
+ import pyudev
+ context = pyudev.Context()
+ return {
+ temp_sensor_map(device): \
+ int(device.attributes.get('temp').decode('ascii'))
+ for device in context.list_devices(subsystem='thermal')
+ if 'temp' in device.attributes.available_attributes \
+ and device.attributes.get('temp') is not None
+ }
+
+def get_fan_values():
+ """
+ Return a dict of fan name -> fan speed key/values.
+ """
+ import pyudev
+ context = pyudev.Context()
+ return {
+ device.sys_name: int(device.attributes.get('cur_state'))
+ for device in context.list_devices(subsystem='thermal')
+ if 'cur_state' in device.attributes.available_attributes \
+ and device.attributes.get('cur_state') is not None
+ }
+
+def get_link_up(if_name):
+ """
+ Return a dictionary {if_name: IFLA_OPERSTATE}
+ """
+ from pyroute2 import IPRoute
+ result = {}
+ with IPRoute() as ipr:
+ links = ipr.link_lookup(ifname=if_name)
+ if not links:
+ return {'error_msg': "No interface found"}
+ link_info = next(iter(ipr.get_links(links)), None)
+ if link_info is None:
+ return {'error_msg': "Error on get_links for sfp0"}
+ result['sfp0'] = link_info.get_attr('IFLA_OPERSTATE')
+ if result['sfp0'] != 'UP':
+ result['error_msg'] = "Link not up for interface"
+ return result
+
+
+
+##############################################################################
+# BIST class
+##############################################################################
+class UsrpBIST(object):
+ """
+ BIST parent class
+ """
+ usrp_type = None
+ default_rev = 3 # Because why not
+ # This defines special tests that are really collections of other tests.
+ collections = {
+ 'standard': ["rtc",],
+ 'extended': "*",
+ }
+ # Default FPGA image type
+ DEFAULT_FPGA_TYPE = None
+ lv_compat_format = None
+ device_args = 'addr=127.0.0.1'
+
+ def make_arg_parser(self):
+ """
+ Return arg parser
+ """
+ parser = argparse.ArgumentParser(
+ description="{} BIST Tool".format(self.usrp_type),
+ )
+ parser.add_argument(
+ '-n', '--dry-run', action='store_true',
+ help="Fake out the tests. All tests will return a valid" \
+ " response, but will not actually interact with hardware.",
+ )
+ parser.add_argument(
+ '-v', '--verbose', action='store_true',
+ help="Crank up verbosity level",
+ )
+ parser.add_argument(
+ '--debug', action='store_true',
+ help="For debugging this tool.",
+ )
+ parser.add_argument(
+ '--option', '-o', action='append', default=[],
+ help="Option for individual test.",
+ )
+ parser.add_argument(
+ '--lv-compat', action='store_true',
+ help="Provides compatibility with the LV JSON parser. Don't run "
+ "this mode unless you know what you're doing. The JSON "
+ "output does not necessarily reflect the actual system "
+ "status when using this mode.",
+ )
+ parser.add_argument(
+ '--skip-fpga-reload', action='store_true',
+ help="Skip reloading the default FPGA image post-test. Note: by"
+ "specifying this argument, the FPGA image loaded could be "
+ "anything post-test.",
+ )
+ parser.add_argument(
+ 'tests',
+ help="List the tests that should be run",
+ nargs='+', # There has to be at least one
+ )
+ return parser
+
+ def get_mb_periph_mgr(self):
+ """Needs to be implemented by child class"""
+ raise NotImplementedError
+
+ def get_product_id(self):
+ """Needs to be implemented by child class"""
+ raise NotImplementedError
+
+ def __init__(self):
+ assert self.DEFAULT_FPGA_TYPE is not None
+ assert self.device_args is not None
+ assert self.usrp_type is not None
+ assert self.lv_compat_format is not None
+ self.args = self.make_arg_parser().parse_args()
+ self.args.option = expand_options(self.args.option)
+ # If this is true, trigger a reload of the default FPGA image
+ self.reload_fpga_image = False
+ try:
+ default_rev = self.get_mb_periph_mgr().mboard_max_rev
+ except ImportError:
+ # This means we're in dry run mode or something like that, so just
+ # pick something
+ default_rev = self.default_rev
+ self.mb_rev = int(self.args.option.get('mb_rev', default_rev))
+ self.tests_to_run = set()
+ for test in self.args.tests:
+ if test in self.collections:
+ for this_test in self.expand_collection(test):
+ self.tests_to_run.add(this_test)
+ else:
+ self.tests_to_run.add(test)
+ try:
+ # Keep this import here so we can do dry-runs without any MPM code
+ from usrp_mpm import get_main_logger
+ if not self.args.verbose:
+ from usrp_mpm.mpmlog import WARNING
+ get_main_logger().setLevel(WARNING)
+ self.log = get_main_logger().getChild('main')
+ except ImportError:
+ print("No logging capability available.")
+
+ def expand_collection(self, coll):
+ """
+ Return names of tests in a collection
+ """
+ tests = self.collections[coll]
+ if tests == "*":
+ tests = {x.replace('bist_', '')
+ for x in dir(self)
+ if x.find('bist_') == 0
+ }
+ else:
+ tests = set(tests)
+ return tests
+
+ def run(self):
+ """
+ Execute tests.
+
+ Returns True on Success.
+ """
+ def execute_test(testname):
+ """
+ Actually run a test.
+ """
+ testmethod_name = "bist_{0}".format(testname)
+ sys.stderr.write(
+ "Executing test method: {0}\n\n".format(testmethod_name)
+ )
+ if not hasattr(self, testmethod_name):
+ sys.stderr.write("Test not defined: `{}`\n".format(testname))
+ return False, {}
+ try:
+ status, data = getattr(self, testmethod_name)()
+ data['status'] = status
+ data['error_msg'] = data.get('error_msg', '')
+ return status, data
+ except Exception as ex:
+ sys.stderr.write(
+ "Test {} failed to execute: {}\n".format(testname, str(ex))
+ )
+ if self.args.debug:
+ raise
+ return False, {'error_msg': str(ex)}
+ tests_successful = True
+ result = {}
+ for test in self.tests_to_run:
+ status, result_data = execute_test(test)
+ tests_successful = tests_successful and status
+ result[test] = result_data
+ if self.args.lv_compat:
+ result = filter_results_for_lv(result, self.lv_compat_format)
+ post_results(result)
+ if self.reload_fpga_image and not self.args.skip_fpga_reload:
+ load_fpga_image(
+ self.DEFAULT_FPGA_TYPE,
+ self.device_args,
+ self.get_product_id(),
+ )
+ return tests_successful
+
+#############################################################################
+# BISTS
+# All bist_* methods must return True/False success values!
+#############################################################################
+ def bist_rtc(self):
+ """
+ BIST for RTC (real time clock)
+
+ Return dictionary:
+ - date: Returns the current UTC time, with seconds-accuracy, in ISO 8601
+ format, as a string. As if running 'date -Iseconds -u'.
+ - time: Same time, but in seconds since epoch.
+
+ Return status:
+ Unless datetime throws an exception, returns True.
+ """
+ assert 'rtc' in self.tests_to_run
+ utc_now = datetime.utcnow()
+ return True, {
+ 'time': time.mktime(utc_now.timetuple()),
+ 'date': utc_now.replace(microsecond=0).isoformat() + "+00:00",
+ }