aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/x4xx_bist
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/x4xx_bist')
-rw-r--r--mpm/python/x4xx_bist1048
1 files changed, 1048 insertions, 0 deletions
diff --git a/mpm/python/x4xx_bist b/mpm/python/x4xx_bist
new file mode 100644
index 000000000..adda5ecb0
--- /dev/null
+++ b/mpm/python/x4xx_bist
@@ -0,0 +1,1048 @@
+#!/usr/bin/env python3
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+X4XX Built-In Self Test (BIST)
+
+Will work on all derivatives of the X4xx series.
+"""
+
+import sys
+import time
+import subprocess
+from usrp_mpm.sys_utils.udev import dt_symbol_get_spidev
+from usrp_mpm import bist
+
+# Timeout values are in seconds:
+GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute"
+GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test
+ # does not necessarily require GPS lock to pass, we
+ # reduce this value in order for the BIST to pass faster
+ # by default.
+DEFAULT_DB_ID = 0
+
+# We will import stuff as late as possible, intentionally, so let's calm down
+# PyLint
+# pylint: disable=import-outside-toplevel
+
+
+def get_rfdc_config(log):
+ """
+ Return resampling, halfband settings of current FPGA image.
+ """
+ from usrp_mpm.periph_manager import x4xx_rfdc_regs
+ from usrp_mpm.periph_manager import x4xx_rfdc_ctrl
+ rdfc_regs_control = x4xx_rfdc_regs.RfdcRegsControl(
+ x4xx_rfdc_ctrl.X4xxRfdcCtrl.rfdc_regs_label, log)
+ return rdfc_regs_control.get_rfdc_resampling_factor(0)
+
+
+##############################################################################
+# Bist class
+##############################################################################
+class X4XXBIST(bist.UsrpBIST):
+ """
+ BIST Tool for the USRP X4xx series
+ """
+ usrp_type = "X4XX"
+ # This defines special tests that are really collections of other tests.
+ collections = {
+ 'standard': ["gpsdo", "rtc", "temp", "fan"],
+ 'extended': "*",
+ }
+ # Default FPGA image type
+ DEFAULT_FPGA_TYPE = 'X4_200'
+ lv_compat_format = {
+ 'ddr3': {
+ 'throughput': -1,
+ },
+ 'gpsdo': {
+ "class": "",
+ "time": "",
+ "ept": -1,
+ "lat": -1,
+ "lon": -1,
+ "alt": -1,
+ "epx": -1,
+ "epy": -1,
+ "epv": -1,
+ "track": -1,
+ "speed": -1,
+ "climb": -1,
+ "eps": -1,
+ "mode": -1,
+ },
+ 'gpio': {
+ 'write_patterns': [],
+ 'read_patterns': [],
+ },
+ 'temp': {
+ "DRAM PCB": 40000,
+ "EC Internal": 41000,
+ "PMBUS-0": 42000,
+ "PMBUS-1": 43000,
+ "Power Supply PCB": 44000,
+ "RFSoC": 45000,
+ "Sample Clock PCB": 46000,
+ "TMP464 Internal": 47000,
+ },
+ 'fan': {
+ 'fan0': -1,
+ 'fan1': -1,
+ },
+ }
+ device_args = "type=x4xx,mgmt_addr=127.0.0.1"
+
+ def __init__(self):
+ bist.UsrpBIST.__init__(self)
+
+ def get_mb_periph_mgr(self):
+ """Return reference to an x4xx periph manager"""
+ from usrp_mpm.periph_manager.x4xx import x4xx
+ return x4xx
+
+ def get_product_id(self):
+ """Return the mboard product ID:"""
+ # TODO: use correct product ID, this is just a hack
+ # TODO: the path to the eeprom is not self-speaking like e.g. mb_eeprom
+ # return bist.get_product_id_from_eeprom(valid_ids=['x410'], eeprom='mb_eeprom')
+ return self.get_product_id_from_eeprom(
+ valid_ids=['0410'],
+ eeprom='/sys/bus/nvmem/devices/13-00500/nvmem')
+
+ def get_product_id_from_eeprom(self, valid_ids, eeprom):
+ """Return the mboard product ID
+ Returns something like x410...
+ """
+ # it is just here as override because eeprom-id installed on the
+ # x410 now needs a parameter while on n310 it runs without
+ # also the path to the eeprom is not self-speaking like e.g. mb_eeprom
+ # last problem is that the returned id is not 'x410' but '0410'
+ cmd = ['eeprom-id']
+ cmd.append(eeprom)
+ cmd = ' '.join(cmd)
+ output = subprocess.check_output(
+ cmd,
+ stderr=subprocess.STDOUT,
+ shell=True,
+ ).decode('utf-8')
+ sys.stderr.write("product_id_from_eeprom: {}".format(str(output)))
+ for valid_id in valid_ids:
+ if valid_id in output:
+ return 'x410'
+ raise AssertionError("Cannot determine product ID.: `{}'".format(output))
+
+ def reload_fpga(self, fpga_type, sfp0_addrs):
+ """
+ Loads the specified fpga and checks for sfp0 address, if the sfp0 address
+ existed, restores the original sfp0 address
+ """
+ from usrp_mpm.sys_utils import net
+ from pyroute2 import IPRoute
+ import errno
+ bist.load_fpga_image(
+ fpga_type,
+ self.device_args,
+ 'x410',
+ )
+ if sfp0_addrs:
+ ipr = IPRoute()
+ dev = ipr.link_lookup(ifname='sfp0')[0]
+ ipr.link('set', index=dev, state='down')
+ try:
+ ipr.addr('add', index=dev, address=sfp0_addrs[0], mask=24, label='sfp0')
+ except Exception as ex:
+ # If the addr already exists then ignore the error
+ if ex.code == errno.EEXIST:
+ pass
+ ipr.link('set', index=dev, state='up')
+
+ def check_fpga_type_clkaux(self):
+ """
+ clkaux BISTs require a OSS bitfile, check type and reimage if needed
+ If we have an OSS bitfile, we need to return a mcr that the clkaux
+ lmk can lock to. (Data clock rate of 122.88MHz)
+ """
+ from usrp_mpm.periph_manager import x4xx, x4xx_periphs
+
+ mboard_regs_control = x4xx_periphs.MboardRegsControl(
+ x4xx.x4xx.mboard_regs_label, self.log)
+ fpga_str = mboard_regs_control.get_fpga_type()
+ if not fpga_str or fpga_str == 'LV':
+ bist.load_fpga_image(
+ self.DEFAULT_FPGA_TYPE,
+ self.device_args,
+ 'x410',
+ )
+
+ rfdc_resamp, halfband = get_rfdc_config(self.log)
+ mcr = 122880000 * (8 / rfdc_resamp)
+ if halfband:
+ mcr = mcr / 2
+ return mcr
+
+#############################################################################
+# BISTS
+# All bist_* methods must return True/False success values!
+#############################################################################
+ def bist_gpsdo(self):
+ """
+ BIST for GPSDO
+ Description: Returns GPS information
+ External Equipment: None; Recommend attaching an antenna or providing
+ fake GPS information
+
+ Return dictionary: A TPV dictionary as returned by gpsd.
+ See also: http://www.catb.org/gpsd/gpsd_json.html
+
+ Check for mode 2 or 3 to see if it's locked.
+ """
+ assert 'gpsdo' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {
+ "class": "TPV",
+ "time": "2017-04-30T11:48:20.10Z",
+ "ept": 0.005,
+ "lat": 30.407899,
+ "lon": -97.726634,
+ "alt": 1327.689,
+ "epx": 15.319,
+ "epy": 17.054,
+ "epv": 124.484,
+ "track": 10.3797,
+ "speed": 0.091,
+ "climb": -0.085,
+ "eps": 34.11,
+ "mode": 3
+ }
+ from usrp_mpm.periph_manager import x4xx
+ # Turn on GPS, give some time to acclimatize
+ clk_aux_brd = x4xx.ClockingAuxBrdControl(default_source="gpsdo")
+ time.sleep(5)
+ gps_warmup_timeout = float(
+ self.args.option.get('gps_warmup_timeout', GPS_WARMUP_TIMEOUT))
+ gps_lockok_timeout = float(
+ self.args.option.get('gps_lockok_timeout', GPS_LOCKOK_TIMEOUT))
+ # Wait for WARMUP to go low
+ sys.stderr.write(
+ "Waiting for WARMUP to go low for up to {} seconds...\n".format(
+ gps_warmup_timeout))
+ if not bist.poll_with_timeout(
+ lambda: not clk_aux_brd.get_gps_warmup(),
+ gps_warmup_timeout*1000, 1000
+ ):
+ raise RuntimeError(
+ "GPS-WARMUP did not go low within {} seconds!".format(
+ gps_warmup_timeout))
+ sys.stderr.write("Chip is warmed up.\n")
+ # Wait for LOCKOK. Data sheet says wait up to 15 minutes for GPS lock.
+ sys.stderr.write(
+ "Waiting for LOCKOK to go high for up to {} seconds...\n".format(
+ gps_lockok_timeout))
+ if not bist.poll_with_timeout(
+ clk_aux_brd.get_gps_lock,
+ gps_lockok_timeout*1000,
+ 1000
+ ):
+ sys.stderr.write("No GPS-LOCKOK!\n")
+ sys.stderr.write("GPS-SURVEY status: {}\n".format(
+ clk_aux_brd.get_gps_survey()
+ ))
+ sys.stderr.write("GPS-PHASELOCK status: {}\n".format(
+ clk_aux_brd.get_gps_phase_lock()
+ ))
+ sys.stderr.write("GPS-ALARM status: {}\n".format(
+ clk_aux_brd.get_gps_alarm()
+ ))
+ # Now the chip is on, read back the TPV result
+ result = bist.get_gpsd_tpv_result()
+ # If we reach this line, we have a valid result and the chip responded.
+ # However, it doesn't necessarily mean we had a GPS lock.
+ return True, result
+
+ def bist_ref_clock_mboard(self):
+ """
+ BIST for clock lock from mboard clock source (local to the motherboard)
+
+ Description: Checks to see if the motherboard can lock to its internal
+ clock source.
+
+ External Equipment: None
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+
+ There can be multiple ref lock sensors; for a pass condition they all
+ need to be asserted.
+ """
+ assert 'ref_clock_mboard' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ from usrp_mpm.periph_manager import x4xx_rfdc_ctrl
+ rfdc_resamp, fpga_halfband = get_rfdc_config(self.log)
+ mcrs_to_test = []
+ for master_clock_rate, (_, decimation, _, halfband) in \
+ x4xx_rfdc_ctrl.X4xxRfdcCtrl.master_to_sample_clk.items():
+ if decimation == rfdc_resamp and fpga_halfband == halfband:
+ mcrs_to_test.append(master_clock_rate)
+
+ for master_clock_rate in mcrs_to_test:
+ sys.stderr.write("Testing master_clock_rate {}".format(master_clock_rate))
+ result = bist.get_ref_clock_prop(
+ 'mboard',
+ 'internal',
+ extra_args={
+ 'addr': '169.254.0.2',
+ 'mgmt_addr': '127.0.0.1',
+ 'master_clock_rate': master_clock_rate,
+ }
+ )
+ if 'error_msg' in result:
+ return False, result
+ return True, result
+
+ def bist_ref_clock_ext(self):
+ """
+ BIST for clock lock from external source.
+
+ Description: Checks to see if the motherboard can lock to the external
+ reference clock.
+
+ External Equipment: 10 MHz reference source connected to "ref in".
+
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+
+ There can be multiple ref lock sensors; for a pass condition they all
+ need to be asserted.
+ """
+ assert 'ref_clock_ext' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop(
+ 'external',
+ 'external',
+ extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'}
+ )
+ return 'error_msg' not in result, result
+
+ def bist_ref_clock_gpsdo(self):
+ """
+ BIST for clock lock from gpsdo source.
+
+ Description: Checks to see if the motherboard can lock to the gpsdo
+ reference clock.
+
+ External Equipment: None
+
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+
+ There can be multiple ref lock sensors; for a pass condition they all
+ need to be asserted.
+ """
+ assert 'ref_clock_gpsdo' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop(
+ 'gpsdo',
+ 'gpsdo',
+ extra_args={}
+ )
+ return 'error_msg' not in result, result
+
+ def bist_ref_clock_int(self):
+ """
+ BIST for clock lock from internal source.
+
+ Description: Checks to see if the motherboard can lock to the
+ clocking aux board's internal reference clock.
+
+ External Equipment: None
+
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+
+ There can be multiple ref lock sensors; for a pass condition they all
+ need to be asserted.
+ """
+ assert 'ref_clock_int' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop(
+ 'internal',
+ 'internal',
+ extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'}
+ )
+ return 'error_msg' not in result, result
+
+ def bist_ref_clock_nsync(self):
+ """
+ BIST for clock lock from nsync source.
+
+ Description: Checks to see if the motherboard can lock to the nsync
+ reference clock.
+
+ External Equipment: None
+
+ Return dictionary:
+ - <sensor-name>:
+ - locked: Boolean lock status
+
+ There can be multiple ref lock sensors; for a pass condition they all
+ need to be asserted.
+ """
+ assert 'ref_clock_nsync' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'ref_locked': True}
+ result = bist.get_ref_clock_prop(
+ 'nsync',
+ 'internal',
+ extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'}
+ )
+ return 'error_msg' not in result, result
+
+ def bist_nsync_fabric(self):
+ """
+ BIST for testing the fabric_clk signal from the motherboard
+
+ Description: Checks to see if the LMK on the clocking auxiliary board
+ can lock to the fabric clock signal output by the motherboard. We check
+ this by verifying that the pri_ref signal is being used, the dpll is locked,
+ the apll1 is locked, and apll2 is unlocked.
+
+ External Equipment: None
+
+ Return dictionary:
+ - fabric_clk pll lock: Did the clkaux lmk lock to the fabric_clk signal
+ """
+ assert 'nsync_fabric' in self.tests_to_run
+ import uhd
+ from uhd.usrp import multi_usrp
+
+ try:
+ mcr = self.check_fpga_type_clkaux()
+ usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync,"
+ "master_clock_rate={}".format(str(mcr)))
+ except Exception as ex:
+ return False, {
+ 'error_msg': "Failed to create usrp device: {}".format(str(ex))
+ }
+
+ mpm_c = usrp_dev.get_mpm_client()
+
+ mpm_c.nsync_change_input_source('fabric_clk')
+ # The lock should happen fast, but give it half a second
+ time.sleep(0.5)
+ # Status 1 checks that the lmk is configured to use the priref signal
+ # True = secref, False = priref
+ using_pri_ref = not mpm_c.clkaux_get_nsync_status1()
+ # Status 0 checks dpll loss of lock
+ dpll_lock = not mpm_c.clkaux_get_nsync_status0()
+ # Reg 0xD checks several APLL statuses, we need to make sure APLL1 is
+ # locked and APLL2 is unlocked
+ apll_lock = mpm_c.peek_clkaux(0xD) == '0x8'
+
+ result = using_pri_ref and dpll_lock and apll_lock
+
+ mpm_c.enable_ecpri_clocks(False)
+
+ return result, {"pri_ref_selected": using_pri_ref,
+ "dpll_locked": dpll_lock,
+ "apll1_locked_apll2_unlocked": apll_lock}
+
+ def bist_nsync_gty(self):
+ """
+ BIST for testing the gty_rcv_clk signal from the motherboard
+
+ Description: Checks to see if the LMK on the clocking auxiliary board
+ can lock to the gty_rcv clock signal output by the motherboard. We check
+ this by verifying that the pri_ref signal is being used, the dpll is locked,
+ the apll1 is locked, and apll2 is unlocked.
+
+ External Equipment: None
+
+ Return dictionary:
+ - gty_rcv_clk pll lock: Did the clkaux lmk lock to the gty_rcv_clk signal
+ """
+ assert 'nsync_gty' in self.tests_to_run
+ import uhd
+ from uhd.usrp import multi_usrp
+
+ try:
+ mcr = self.check_fpga_type_clkaux()
+ usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync,"
+ "master_clock_rate={}".format(str(mcr)))
+ except Exception as ex:
+ return False, {
+ 'error_msg': "Failed to create usrp device: {}".format(str(ex))
+ }
+
+ mpm_c = usrp_dev.get_mpm_client()
+
+ mpm_c.nsync_change_input_source('gty_rcv_clk')
+ # The lock should happen fast, but give it half a second
+ time.sleep(0.5)
+ # Status 1 checks that the lmk is configured to use the priref signal
+ # True = secref, False = priref
+ using_pri_ref = not mpm_c.clkaux_get_nsync_status1()
+ # Status 0 checks dpll loss of lock
+ dpll_lock = not mpm_c.clkaux_get_nsync_status0()
+ # Reg 0xD checks several APLL statuses, we need to make sure APLL1 is
+ # locked and APLL2 is unlocked
+ apll_lock = mpm_c.peek_clkaux(0xD) == '0x8'
+
+ result = using_pri_ref and dpll_lock and apll_lock
+
+ mpm_c.enable_ecpri_clocks(False)
+ # Set the clock source back to internal, as the register values
+ # for locking to the gty_rcv_clk cause the mboard to lose ref_lock
+ # to the nsync lmk.
+ mpm_c.set_clock_source('internal')
+
+ return result, {"pri_ref_selected": using_pri_ref,
+ "dpll_locked": dpll_lock,
+ "apll1_locked_apll2_unlocked": apll_lock}
+
+ def bist_clkaux_fpga_aux_ref(self):
+ """
+ BIST for testing the fpga_aux_ref pps source
+
+ Description: Checks to see if the fpga_aux_ref can output a valid pps signal
+
+ External Equipment: None
+
+ Return dictionary:
+ """
+ assert 'clkaux_fpga_aux_ref' in self.tests_to_run
+ import uhd
+ from uhd.usrp import multi_usrp
+
+ try:
+ mcr = self.check_fpga_type_clkaux()
+ usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync,"
+ "master_clock_rate={}".format(str(mcr)))
+ except Exception as ex:
+ return False, {
+ 'error_msg': "Failed to create usrp device: {}".format(str(ex))
+ }
+
+ mpm_c = usrp_dev.get_mpm_client()
+
+ count = mpm_c.get_fpga_aux_ref_freq()
+
+ # We expect a pps signal, pulse is reported in 40 MHz clock ticks, so
+ # 1 PPS is expected to return 40 million ticks, this gives 1% tolerance
+ return 39600000 < count < 40400000, {}
+
+ def bist_nsync_rpll_config(self):
+ """
+ BIST for testing that the LMK28PRIRefClk can be used as a source for the
+ motherboard RPLL
+
+ Description: Enable the LMK on the clocking auxiliary board to output a signal
+ on the LMK28PRIRefClk line to the motherboard RPLL. Configure the RPLL to use
+ this source, and check to see if the RPLL can lock to the source.
+
+ External Equipment: None
+
+ Return dictionary:
+ """
+ assert 'nsync_rpll_config' in self.tests_to_run
+ import uhd
+ from uhd.usrp import multi_usrp
+ from usrp_mpm.sys_utils import net
+
+ try:
+ mcr = self.check_fpga_type_clkaux()
+ usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync,"
+ "master_clock_rate={}".format(str(mcr)))
+ except Exception as ex:
+ return False, {
+ 'error_msg': "Failed to create usrp device: {}".format(str(ex))
+ }
+
+ sfp0_addrs = net.get_iface_info('sfp0')['ip_addrs']
+
+ mpm_c = usrp_dev.get_mpm_client()
+
+ mpm_c.config_rpll_to_nsync()
+
+ ref_locked = mpm_c.get_ref_lock_sensor()
+
+ result = ref_locked.get('value') == 'true'
+
+ fpga_type = mpm_c.get_device_info()['fpga']
+
+ del usrp_dev
+
+ # This is a brute-force way of making sure the device gets back into a clean state after
+ # we touched the rpll configuration
+ self.reload_fpga(fpga_type, sfp0_addrs)
+
+ return result, ref_locked
+
+ def bist_gpio(self):
+ """
+ BIST for GPIO
+ Description: Writes and reads the values to the GPIO
+
+ Needed Equipment: External loopback cable between port 0 and port 1
+
+ Notes:
+ - X410 has two FP-GPIO connectors (HDMI connectors) with 12 programmable
+ pins each
+
+ Return dictionary:
+ - write_patterns: A list of patterns that were written
+ - read_patterns: A list of patterns that were read back
+ """
+ assert 'gpio' in self.tests_to_run
+ if self.args.dry_run:
+ patterns = range(64)
+ return True, {
+ 'write_patterns': list(patterns),
+ 'read_patterns': list(patterns),
+ }
+ from usrp_mpm.periph_manager import x4xx, x4xx_periphs, x4xx_mb_cpld
+ mboard_regs_control = x4xx_periphs.MboardRegsControl(
+ x4xx.x4xx.mboard_regs_label, self.log)
+ cpld_spi_node = dt_symbol_get_spidev('mb_cpld')
+ cpld_control = x4xx_mb_cpld.MboardCPLD(cpld_spi_node, self.log)
+ gpio_diocontrol = x4xx_periphs.DioControl(
+ mboard_regs_control,
+ cpld_control,
+ self.log)
+ def _run_sub_test(inport, outport, pin_mode, voltage, pattern):
+ """
+ Closure to run an actual test. The GPIO control object is enclosed.
+
+ Arguments:
+ inport: "port" argument for DioControl, input port
+ outport: "port" argument for DioControl, input port
+ pin_mode: HDMI or DIO (see DioControl)
+ voltage: Valid arg for DioControl.set_voltage_level()
+ pattern: Bits to write to the inport, should be read back at outport
+ """
+ gpio_diocontrol.set_port_mapping(pin_mode)
+ # We set all pins to be driven by the PS
+ # in HDMI mode not all pins can be accessed by the user
+ if pin_mode == "HDMI":
+ mask = 0xDB6D
+ else:
+ mask = 0xFFF
+ gpio_diocontrol.set_pin_masters(inport, mask)
+ gpio_diocontrol.set_pin_masters(outport, mask)
+ gpio_diocontrol.set_voltage_level(inport, voltage)
+ gpio_diocontrol.set_voltage_level(outport, voltage)
+ gpio_diocontrol.set_pin_directions(inport, 0x00000)
+ gpio_diocontrol.set_pin_directions(outport, 0xFFFFF)
+ gpio_diocontrol.set_pin_outputs(outport, pattern)
+ read_values = gpio_diocontrol.get_pin_inputs(inport)
+ if (pattern & mask) != read_values:
+ sys.stderr.write(gpio_diocontrol.status())
+ return False, {'write_patterns': ["0x{:04X}".format(pattern)],
+ 'read_patterns': ["0x{:04X}".format(read_values)]}
+ return True, {'write_patterns': ["0x{:04X}".format(pattern)],
+ 'read_patterns': ["0x{:04X}".format(read_values)]}
+ # Now run tests:
+ for voltage in ["1V8", "2V5", "3V3"]:
+ for mode in ["DIO", "HDMI"]:
+ for pattern in [0xFFFF, 0xA5A5, 0x5A5A, 0x0000]:
+ sys.stderr.write("test: PortA -> PortB, {}, {}, 0x{:04X}"
+ .format(voltage, mode, pattern))
+ status, data = _run_sub_test(
+ "PORTB", "PORTA", mode, voltage, pattern)
+ if not status:
+ return status, data
+ sys.stderr.write("test: PortB -> PortA, {}, {}, 0x{:04X}"
+ .format(voltage, mode, pattern))
+ status, data = _run_sub_test(
+ "PORTA", "PORTB", mode, voltage, pattern)
+ if not status:
+ return status, data
+ return status, data
+
+
+ def bist_qsfp(self):
+ """
+ BiST for QSFP status and property read out.
+
+ Description: Tests MODSEL (write) and MODPRS (read) pin at QSFP port
+ and I2C communication with QSFP module. By default all
+ ports are tested. The user can add module to the option
+ argument when calling the test to select a specific
+ port out of [0,1]. A negative number will test all
+ ports (the default)
+
+ Example: The following example will run the test on port 0:
+ > x4xx_bist qsfp --option module=0
+
+ Needed Equipment: None, for exhaustive test results the test should
+ be run with loopback test modules.
+
+ Notes: The test ensures consistency of I2C communication and the
+ MODSEL and MODPRS pins. If MODPRS pin is active low
+ (module present) I2C communication must return valid values for
+ all QSFP properties. On the other hand when MODPRS pin is high
+ QSFP properties should report None as the only valid value.
+ The test also disables the I2C communication (unsetting MODSEL
+ pin) and checks that the low level I2C communication with the
+ module fails. The communication is reenabled after 3sec. This
+ window allows a tester to check whether a loopback adapter
+ signals the state of MODSEL correctly.
+ """
+
+ assert 'qsfp' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {}
+
+ from usrp_mpm.periph_manager import x4xx
+ from usrp_mpm.periph_manager.x4xx_periphs import QSFPModule
+
+ def add_error_msg(msg):
+ # add error message to result map
+ if "error_msg" not in infos:
+ infos["error_msg"] = []
+ infos["error_msg"].append(msg)
+
+ def modules_to_test():
+ module = self.args.option.get("module", -1)
+ try:
+ module = int(module)
+ except ValueError:
+ add_error_msg("Module '{}' is not an int".format(module))
+ return None
+ if module < 0:
+ return x4xx.X400_QSFP_I2C_CONFIGS
+ if module not in [0, 1]:
+ add_error_msg("Module to test must be 0 or 1")
+ return None
+ return [x4xx.X400_QSFP_I2C_CONFIGS[module]]
+
+
+ infos = {}
+ modules_to_test = modules_to_test()
+ if not modules_to_test:
+ return False, infos
+
+ result = True
+
+ for config in modules_to_test:
+ qsfp = QSFPModule(
+ config.modprs, config.modsel, config.devsymbol, self.log)
+ info = {}
+ info["available"] = qsfp.is_available()
+ if info["available"]:
+ # if adapter is available, read out prop via I2C and
+ # verify they are all valid (not None)
+ props = [qsfp.decoded_status,
+ qsfp.vendor_name,
+ qsfp.connector_type]
+ for prop in props:
+ info[prop.__name__] = prop()
+ if not all(info.values()):
+ result = False
+ add_error_msg("QSFP adapter at {} is present but has "
+ "None property.".format(config.devsymbol))
+ else:
+ # if adapter is not available
+ # reading a property *must* return None
+ if qsfp.connector_type() is not None:
+ result = False
+ add_error_msg("QSFP adapter at {} reports valid property "
+ "but is not present.".format(config.devsymbol))
+
+ infos[config.devsymbol] = info
+
+ # disable i2c communication and check for failure on low level read
+ qsfp.enable_i2c(False)
+ try:
+ qsfp.qsfp_regs.peek8(0)
+ result = False
+ add_error_msg("Reading I2C when disabled should fail with "
+ "RuntimeError at {}\n".format(config.devsymbol))
+ except RuntimeError:
+ pass
+ sys.stderr.write("A loopback QSFP adapter at {} should report MODSEL "
+ "inactive for 3 sec.".format(config.devsymbol))
+ time.sleep(3)
+ qsfp.enable_i2c(True)
+
+ return result, infos
+
+
+ def bist_temp(self):
+ """
+ BIST for temperature sensors
+ Description: Reads the temperature sensors on the motherboards and
+ returns their values in mC
+
+ Return dictionary:
+ - <thermal-zone-name>: temp in mC
+ """
+ assert 'temp' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {"DRAM PCB": 40000,
+ "EC Internal": 41000,
+ "PMBUS-0": 42000,
+ "PMBUS-1": 43000,
+ "Power Supply PCB": 44000,
+ "RFSoC": 45000,
+ "Sample Clock PCB": 46000,
+ "TMP464 Internal": 47000,
+ }
+
+ result = bist.get_iio_temp_sensor_values()
+
+ if len(result) < 1:
+ result['error_msg'] = "No temperature sensors found!"
+
+ return 'error_msg' not in result, result
+
+ def bist_fan(self):
+ """
+ BIST for fans
+ Description: Reads the RPM values of the fans on the motherboard
+
+ Return dictionary:
+ - <fan-name>: Fan speed in RPM
+
+ External Equipment: None
+ """
+ assert 'fan' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {'fan0': 10000, 'fan1': 10000}
+ result = bist.get_ectool_fan_values()
+ return len(result) == 2, result
+
+ def _db_flash_init(self, db_flash):
+ """
+ Initialize the specified DB Flash and verify
+ its state
+ """
+ db_flash.init()
+ if not db_flash.initialized:
+ raise RuntimeError()
+
+ def _db_flash_deinit(self, db_flash):
+ """
+ De-initialize the specified DB Flash and verify
+ its state
+ """
+ db_flash.deinit()
+ if db_flash.initialized:
+ raise RuntimeError()
+
+ def bist_spi_flash_integrity(self):
+ """
+ BIST for SPI flash on DB
+ Description: Performs data integrity test on a section of
+ the flash memory. Stop the MPM service before running this
+ test using "systemctl stop usrp-hwd" command.
+
+ External Equipment: None, but at least one daughterboard
+ should be installed in the X410 unit.
+
+ Return dictionary:
+
+ """
+ assert 'spi_flash_integrity' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {}
+ import os
+ from usrp_mpm.sys_utils.db_flash import DBFlash
+ FIXED_MEMORY_PATTERN = 'fixed'
+ RANDOM_MEMORY_PATTERN = 'random'
+
+ db_id = int(self.args.option.get('db_id', DEFAULT_DB_ID))
+ assert db_id in [0, 1]
+
+ db_flash = DBFlash(db_id, log=None)
+
+ buf_size = 100
+ memory_pattern = str(self.args.option.get('memory_pattern', RANDOM_MEMORY_PATTERN))
+ assert memory_pattern in (FIXED_MEMORY_PATTERN, RANDOM_MEMORY_PATTERN)
+ if memory_pattern == RANDOM_MEMORY_PATTERN:
+ buff = os.urandom(buf_size)
+ else:
+ buff = [0xA5] * buf_size
+
+ data_valid = False
+
+ try:
+ self._db_flash_init(db_flash)
+ except (ValueError, RuntimeError) as ex:
+ return False, {"error_msg": "Error while initializing flash storage: " + str(ex)}
+
+ file_path = f'/mnt/db{db_id}_flash/test.bin'
+
+ sys.stderr.write("Testing DB{} with {} memory pattern..".format(db_id, memory_pattern))
+ with open(file_path, "wb") as f:
+ f.write(bytearray(buff))
+
+ try:
+ self._db_flash_deinit(db_flash)
+ self._db_flash_init(db_flash)
+ except (ValueError, RuntimeError) as ex:
+ return False, {"error_msg": "Error while init(deinit)ializing flash storage: " + str(ex)}
+
+ with open(file_path, "rb") as f:
+ read_data = f.read()
+ data_valid = read_data == bytearray(buff)
+ os.remove(file_path)
+ self._db_flash_deinit(db_flash)
+
+ return data_valid, {}
+
+ def bist_spi_flash_speed(self):
+ """
+ BIST for SPI flash on DB
+ Description: Performs read and write speed test on the SPI flash
+ memory on DB. Stop the MPM service before running this test
+ using "systemctl stop usrp-hwd" command.
+
+ External Equipment: None, but at least one daughterboard
+ should be installed in the X410 unit.
+
+ Return dictionary:
+
+ """
+ assert 'spi_flash_speed' in self.tests_to_run
+ if self.args.dry_run:
+ return True, {}
+ import os
+ import re
+ from usrp_mpm.sys_utils.db_flash import DBFlash
+
+ MIN_WRITE_SPEED = 5000 #B/s
+ MIN_READ_SPEED = 5000 #B/s
+ def parse_speed(cmd_output):
+ mobj = re.search(
+ r"(.*records in\n)(.*records out\n)(.* (?P<speed>[0-9.]+) (?P<order>\S?)B\/s)",
+ cmd_output)
+ if mobj is None:
+ return 0
+ scale = {'': 1, 'k': 1024, 'M': 1024*1024, 'G': 1024*1024*1024}
+ order = mobj.group('order')
+ if order not in scale:
+ raise ValueError(f"unsupported unit '{order}B/s'")
+ return float(mobj.group('speed')) * scale[order]
+
+ db_id = int(self.args.option.get('db_id', DEFAULT_DB_ID))
+ assert db_id in [0, 1]
+
+ db_flash = DBFlash(db_id, log=None)
+
+ file_path = f'/mnt/db{db_id}_flash/test.bin'
+
+ try:
+ self._db_flash_init(db_flash)
+ except (ValueError, RuntimeError) as ex:
+ return False, {
+ "error_msg": "Error while initializing flash storage: {}".format(str(ex))
+ }
+
+ sys.stderr.write("Testing DB{}..".format(db_id))
+ write_error_msg = None
+ sys.stderr.write("Write Speed Test:")
+ cmd = [
+ 'dd',
+ 'if=/dev/zero',
+ 'of=' + file_path,
+ 'bs=512',
+ 'count=1000',
+ 'oflag=dsync'
+ ]
+
+ try:
+ output = subprocess.check_output(
+ cmd,
+ stderr=subprocess.STDOUT,
+ )
+ except subprocess.CalledProcessError as ex:
+ output = ex.output
+ write_error_msg = "Error during Write Test: {}".format(output)
+ write_test_output = output.decode("utf-8")
+ sys.stderr.write(write_test_output)
+
+ # De-init and init flash here to mitigate any effects
+ # caching may have on the read speed.
+ try:
+ self._db_flash_deinit(db_flash)
+ self._db_flash_init(db_flash)
+ except (ValueError, RuntimeError) as ex:
+ return False, {
+ "error_msg": "Error while init(deinit)ializing flash storage: {}".format(str(ex))}
+
+ sys.stderr.write("Read Speed Test:")
+ read_error_msg = None
+ cmd = [
+ 'dd',
+ 'if=' + file_path,
+ 'of=/dev/null',
+ 'bs=512',
+ 'count=1000',
+ 'oflag=dsync'
+ ]
+
+ try:
+ output = subprocess.check_output(
+ cmd,
+ stderr=subprocess.STDOUT,
+ )
+ except subprocess.CalledProcessError as ex:
+ output = ex.output
+ read_error_msg = "Error during Read Test: {}".format(output)
+ read_test_output = output.decode("utf-8")
+ sys.stderr.write(read_test_output)
+
+ # Clean up.
+ os.remove(file_path)
+ self._db_flash_deinit(db_flash)
+
+ test_status = bool(write_error_msg is None and read_error_msg is None)
+
+ if test_status:
+ write_speed = parse_speed(write_test_output)
+ read_speed = parse_speed(read_test_output)
+
+ if write_speed == 0:
+ test_status = False
+ write_error_msg = "Write speed parse error"
+ elif write_speed < MIN_WRITE_SPEED:
+ test_status = False
+ write_error_msg = \
+ "Write speed {} B/s is below minimum requirement of {} B/s".format(
+ write_speed, MIN_WRITE_SPEED)
+
+ if read_speed == 0:
+ test_status = False
+ read_error_msg = "Read speed parse error"
+ elif read_speed < MIN_READ_SPEED:
+ test_status = False
+ read_error_msg = \
+ "Read speed {} B/s is below minimum requirement of {} B/s".format(
+ read_speed, MIN_READ_SPEED)
+
+ return test_status, {
+ "Write Test": write_test_output if write_error_msg is None else write_error_msg,
+ "Read Test": read_test_output if read_error_msg is None else read_error_msg
+ }
+
+
+##############################################################################
+# main
+##############################################################################
+def main():
+ " Go, go, go! "
+ return X4XXBIST().run()
+
+if __name__ == '__main__':
+ sys.exit(not main())