From 2a575bf9b5a4942f60e979161764b9e942699e1e Mon Sep 17 00:00:00 2001 From: Lars Amsel Date: Fri, 4 Jun 2021 08:27:50 +0200 Subject: uhd: Add support for the USRP X410 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lars Amsel Co-authored-by: Michael Auchter Co-authored-by: Martin Braun Co-authored-by: Paul Butler Co-authored-by: Cristina Fuentes Co-authored-by: Humberto Jimenez Co-authored-by: Virendra Kakade Co-authored-by: Lane Kolbly Co-authored-by: Max Köhler Co-authored-by: Andrew Lynch Co-authored-by: Grant Meyerhoff Co-authored-by: Ciro Nishiguchi Co-authored-by: Thomas Vogel --- mpm/python/x4xx_bist | 1048 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1048 insertions(+) create mode 100644 mpm/python/x4xx_bist (limited to 'mpm/python/x4xx_bist') diff --git a/mpm/python/x4xx_bist b/mpm/python/x4xx_bist new file mode 100644 index 000000000..adda5ecb0 --- /dev/null +++ b/mpm/python/x4xx_bist @@ -0,0 +1,1048 @@ +#!/usr/bin/env python3 +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +X4XX Built-In Self Test (BIST) + +Will work on all derivatives of the X4xx series. +""" + +import sys +import time +import subprocess +from usrp_mpm.sys_utils.udev import dt_symbol_get_spidev +from usrp_mpm import bist + +# Timeout values are in seconds: +GPS_WARMUP_TIMEOUT = 70 # Data sheet says "about a minute" +GPS_LOCKOK_TIMEOUT = 2 # Data sheet says about 15 minutes. Because our test + # does not necessarily require GPS lock to pass, we + # reduce this value in order for the BIST to pass faster + # by default. +DEFAULT_DB_ID = 0 + +# We will import stuff as late as possible, intentionally, so let's calm down +# PyLint +# pylint: disable=import-outside-toplevel + + +def get_rfdc_config(log): + """ + Return resampling, halfband settings of current FPGA image. + """ + from usrp_mpm.periph_manager import x4xx_rfdc_regs + from usrp_mpm.periph_manager import x4xx_rfdc_ctrl + rdfc_regs_control = x4xx_rfdc_regs.RfdcRegsControl( + x4xx_rfdc_ctrl.X4xxRfdcCtrl.rfdc_regs_label, log) + return rdfc_regs_control.get_rfdc_resampling_factor(0) + + +############################################################################## +# Bist class +############################################################################## +class X4XXBIST(bist.UsrpBIST): + """ + BIST Tool for the USRP X4xx series + """ + usrp_type = "X4XX" + # This defines special tests that are really collections of other tests. + collections = { + 'standard': ["gpsdo", "rtc", "temp", "fan"], + 'extended': "*", + } + # Default FPGA image type + DEFAULT_FPGA_TYPE = 'X4_200' + lv_compat_format = { + 'ddr3': { + 'throughput': -1, + }, + 'gpsdo': { + "class": "", + "time": "", + "ept": -1, + "lat": -1, + "lon": -1, + "alt": -1, + "epx": -1, + "epy": -1, + "epv": -1, + "track": -1, + "speed": -1, + "climb": -1, + "eps": -1, + "mode": -1, + }, + 'gpio': { + 'write_patterns': [], + 'read_patterns': [], + }, + 'temp': { + "DRAM PCB": 40000, + "EC Internal": 41000, + "PMBUS-0": 42000, + "PMBUS-1": 43000, + "Power Supply PCB": 44000, + "RFSoC": 45000, + "Sample Clock PCB": 46000, + "TMP464 Internal": 47000, + }, + 'fan': { + 'fan0': -1, + 'fan1': -1, + }, + } + device_args = "type=x4xx,mgmt_addr=127.0.0.1" + + def __init__(self): + bist.UsrpBIST.__init__(self) + + def get_mb_periph_mgr(self): + """Return reference to an x4xx periph manager""" + from usrp_mpm.periph_manager.x4xx import x4xx + return x4xx + + def get_product_id(self): + """Return the mboard product ID:""" + # TODO: use correct product ID, this is just a hack + # TODO: the path to the eeprom is not self-speaking like e.g. mb_eeprom + # return bist.get_product_id_from_eeprom(valid_ids=['x410'], eeprom='mb_eeprom') + return self.get_product_id_from_eeprom( + valid_ids=['0410'], + eeprom='/sys/bus/nvmem/devices/13-00500/nvmem') + + def get_product_id_from_eeprom(self, valid_ids, eeprom): + """Return the mboard product ID + Returns something like x410... + """ + # it is just here as override because eeprom-id installed on the + # x410 now needs a parameter while on n310 it runs without + # also the path to the eeprom is not self-speaking like e.g. mb_eeprom + # last problem is that the returned id is not 'x410' but '0410' + cmd = ['eeprom-id'] + cmd.append(eeprom) + cmd = ' '.join(cmd) + output = subprocess.check_output( + cmd, + stderr=subprocess.STDOUT, + shell=True, + ).decode('utf-8') + sys.stderr.write("product_id_from_eeprom: {}".format(str(output))) + for valid_id in valid_ids: + if valid_id in output: + return 'x410' + raise AssertionError("Cannot determine product ID.: `{}'".format(output)) + + def reload_fpga(self, fpga_type, sfp0_addrs): + """ + Loads the specified fpga and checks for sfp0 address, if the sfp0 address + existed, restores the original sfp0 address + """ + from usrp_mpm.sys_utils import net + from pyroute2 import IPRoute + import errno + bist.load_fpga_image( + fpga_type, + self.device_args, + 'x410', + ) + if sfp0_addrs: + ipr = IPRoute() + dev = ipr.link_lookup(ifname='sfp0')[0] + ipr.link('set', index=dev, state='down') + try: + ipr.addr('add', index=dev, address=sfp0_addrs[0], mask=24, label='sfp0') + except Exception as ex: + # If the addr already exists then ignore the error + if ex.code == errno.EEXIST: + pass + ipr.link('set', index=dev, state='up') + + def check_fpga_type_clkaux(self): + """ + clkaux BISTs require a OSS bitfile, check type and reimage if needed + If we have an OSS bitfile, we need to return a mcr that the clkaux + lmk can lock to. (Data clock rate of 122.88MHz) + """ + from usrp_mpm.periph_manager import x4xx, x4xx_periphs + + mboard_regs_control = x4xx_periphs.MboardRegsControl( + x4xx.x4xx.mboard_regs_label, self.log) + fpga_str = mboard_regs_control.get_fpga_type() + if not fpga_str or fpga_str == 'LV': + bist.load_fpga_image( + self.DEFAULT_FPGA_TYPE, + self.device_args, + 'x410', + ) + + rfdc_resamp, halfband = get_rfdc_config(self.log) + mcr = 122880000 * (8 / rfdc_resamp) + if halfband: + mcr = mcr / 2 + return mcr + +############################################################################# +# BISTS +# All bist_* methods must return True/False success values! +############################################################################# + def bist_gpsdo(self): + """ + BIST for GPSDO + Description: Returns GPS information + External Equipment: None; Recommend attaching an antenna or providing + fake GPS information + + Return dictionary: A TPV dictionary as returned by gpsd. + See also: http://www.catb.org/gpsd/gpsd_json.html + + Check for mode 2 or 3 to see if it's locked. + """ + assert 'gpsdo' in self.tests_to_run + if self.args.dry_run: + return True, { + "class": "TPV", + "time": "2017-04-30T11:48:20.10Z", + "ept": 0.005, + "lat": 30.407899, + "lon": -97.726634, + "alt": 1327.689, + "epx": 15.319, + "epy": 17.054, + "epv": 124.484, + "track": 10.3797, + "speed": 0.091, + "climb": -0.085, + "eps": 34.11, + "mode": 3 + } + from usrp_mpm.periph_manager import x4xx + # Turn on GPS, give some time to acclimatize + clk_aux_brd = x4xx.ClockingAuxBrdControl(default_source="gpsdo") + time.sleep(5) + gps_warmup_timeout = float( + self.args.option.get('gps_warmup_timeout', GPS_WARMUP_TIMEOUT)) + gps_lockok_timeout = float( + self.args.option.get('gps_lockok_timeout', GPS_LOCKOK_TIMEOUT)) + # Wait for WARMUP to go low + sys.stderr.write( + "Waiting for WARMUP to go low for up to {} seconds...\n".format( + gps_warmup_timeout)) + if not bist.poll_with_timeout( + lambda: not clk_aux_brd.get_gps_warmup(), + gps_warmup_timeout*1000, 1000 + ): + raise RuntimeError( + "GPS-WARMUP did not go low within {} seconds!".format( + gps_warmup_timeout)) + sys.stderr.write("Chip is warmed up.\n") + # Wait for LOCKOK. Data sheet says wait up to 15 minutes for GPS lock. + sys.stderr.write( + "Waiting for LOCKOK to go high for up to {} seconds...\n".format( + gps_lockok_timeout)) + if not bist.poll_with_timeout( + clk_aux_brd.get_gps_lock, + gps_lockok_timeout*1000, + 1000 + ): + sys.stderr.write("No GPS-LOCKOK!\n") + sys.stderr.write("GPS-SURVEY status: {}\n".format( + clk_aux_brd.get_gps_survey() + )) + sys.stderr.write("GPS-PHASELOCK status: {}\n".format( + clk_aux_brd.get_gps_phase_lock() + )) + sys.stderr.write("GPS-ALARM status: {}\n".format( + clk_aux_brd.get_gps_alarm() + )) + # Now the chip is on, read back the TPV result + result = bist.get_gpsd_tpv_result() + # If we reach this line, we have a valid result and the chip responded. + # However, it doesn't necessarily mean we had a GPS lock. + return True, result + + def bist_ref_clock_mboard(self): + """ + BIST for clock lock from mboard clock source (local to the motherboard) + + Description: Checks to see if the motherboard can lock to its internal + clock source. + + External Equipment: None + Return dictionary: + - : + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_mboard' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + from usrp_mpm.periph_manager import x4xx_rfdc_ctrl + rfdc_resamp, fpga_halfband = get_rfdc_config(self.log) + mcrs_to_test = [] + for master_clock_rate, (_, decimation, _, halfband) in \ + x4xx_rfdc_ctrl.X4xxRfdcCtrl.master_to_sample_clk.items(): + if decimation == rfdc_resamp and fpga_halfband == halfband: + mcrs_to_test.append(master_clock_rate) + + for master_clock_rate in mcrs_to_test: + sys.stderr.write("Testing master_clock_rate {}".format(master_clock_rate)) + result = bist.get_ref_clock_prop( + 'mboard', + 'internal', + extra_args={ + 'addr': '169.254.0.2', + 'mgmt_addr': '127.0.0.1', + 'master_clock_rate': master_clock_rate, + } + ) + if 'error_msg' in result: + return False, result + return True, result + + def bist_ref_clock_ext(self): + """ + BIST for clock lock from external source. + + Description: Checks to see if the motherboard can lock to the external + reference clock. + + External Equipment: 10 MHz reference source connected to "ref in". + + Return dictionary: + - : + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_ext' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop( + 'external', + 'external', + extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'} + ) + return 'error_msg' not in result, result + + def bist_ref_clock_gpsdo(self): + """ + BIST for clock lock from gpsdo source. + + Description: Checks to see if the motherboard can lock to the gpsdo + reference clock. + + External Equipment: None + + Return dictionary: + - : + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_gpsdo' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop( + 'gpsdo', + 'gpsdo', + extra_args={} + ) + return 'error_msg' not in result, result + + def bist_ref_clock_int(self): + """ + BIST for clock lock from internal source. + + Description: Checks to see if the motherboard can lock to the + clocking aux board's internal reference clock. + + External Equipment: None + + Return dictionary: + - : + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_int' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop( + 'internal', + 'internal', + extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'} + ) + return 'error_msg' not in result, result + + def bist_ref_clock_nsync(self): + """ + BIST for clock lock from nsync source. + + Description: Checks to see if the motherboard can lock to the nsync + reference clock. + + External Equipment: None + + Return dictionary: + - : + - locked: Boolean lock status + + There can be multiple ref lock sensors; for a pass condition they all + need to be asserted. + """ + assert 'ref_clock_nsync' in self.tests_to_run + if self.args.dry_run: + return True, {'ref_locked': True} + result = bist.get_ref_clock_prop( + 'nsync', + 'internal', + extra_args={'addr': '169.254.0.2', 'mgmt_addr': '127.0.0.1'} + ) + return 'error_msg' not in result, result + + def bist_nsync_fabric(self): + """ + BIST for testing the fabric_clk signal from the motherboard + + Description: Checks to see if the LMK on the clocking auxiliary board + can lock to the fabric clock signal output by the motherboard. We check + this by verifying that the pri_ref signal is being used, the dpll is locked, + the apll1 is locked, and apll2 is unlocked. + + External Equipment: None + + Return dictionary: + - fabric_clk pll lock: Did the clkaux lmk lock to the fabric_clk signal + """ + assert 'nsync_fabric' in self.tests_to_run + import uhd + from uhd.usrp import multi_usrp + + try: + mcr = self.check_fpga_type_clkaux() + usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync," + "master_clock_rate={}".format(str(mcr))) + except Exception as ex: + return False, { + 'error_msg': "Failed to create usrp device: {}".format(str(ex)) + } + + mpm_c = usrp_dev.get_mpm_client() + + mpm_c.nsync_change_input_source('fabric_clk') + # The lock should happen fast, but give it half a second + time.sleep(0.5) + # Status 1 checks that the lmk is configured to use the priref signal + # True = secref, False = priref + using_pri_ref = not mpm_c.clkaux_get_nsync_status1() + # Status 0 checks dpll loss of lock + dpll_lock = not mpm_c.clkaux_get_nsync_status0() + # Reg 0xD checks several APLL statuses, we need to make sure APLL1 is + # locked and APLL2 is unlocked + apll_lock = mpm_c.peek_clkaux(0xD) == '0x8' + + result = using_pri_ref and dpll_lock and apll_lock + + mpm_c.enable_ecpri_clocks(False) + + return result, {"pri_ref_selected": using_pri_ref, + "dpll_locked": dpll_lock, + "apll1_locked_apll2_unlocked": apll_lock} + + def bist_nsync_gty(self): + """ + BIST for testing the gty_rcv_clk signal from the motherboard + + Description: Checks to see if the LMK on the clocking auxiliary board + can lock to the gty_rcv clock signal output by the motherboard. We check + this by verifying that the pri_ref signal is being used, the dpll is locked, + the apll1 is locked, and apll2 is unlocked. + + External Equipment: None + + Return dictionary: + - gty_rcv_clk pll lock: Did the clkaux lmk lock to the gty_rcv_clk signal + """ + assert 'nsync_gty' in self.tests_to_run + import uhd + from uhd.usrp import multi_usrp + + try: + mcr = self.check_fpga_type_clkaux() + usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync," + "master_clock_rate={}".format(str(mcr))) + except Exception as ex: + return False, { + 'error_msg': "Failed to create usrp device: {}".format(str(ex)) + } + + mpm_c = usrp_dev.get_mpm_client() + + mpm_c.nsync_change_input_source('gty_rcv_clk') + # The lock should happen fast, but give it half a second + time.sleep(0.5) + # Status 1 checks that the lmk is configured to use the priref signal + # True = secref, False = priref + using_pri_ref = not mpm_c.clkaux_get_nsync_status1() + # Status 0 checks dpll loss of lock + dpll_lock = not mpm_c.clkaux_get_nsync_status0() + # Reg 0xD checks several APLL statuses, we need to make sure APLL1 is + # locked and APLL2 is unlocked + apll_lock = mpm_c.peek_clkaux(0xD) == '0x8' + + result = using_pri_ref and dpll_lock and apll_lock + + mpm_c.enable_ecpri_clocks(False) + # Set the clock source back to internal, as the register values + # for locking to the gty_rcv_clk cause the mboard to lose ref_lock + # to the nsync lmk. + mpm_c.set_clock_source('internal') + + return result, {"pri_ref_selected": using_pri_ref, + "dpll_locked": dpll_lock, + "apll1_locked_apll2_unlocked": apll_lock} + + def bist_clkaux_fpga_aux_ref(self): + """ + BIST for testing the fpga_aux_ref pps source + + Description: Checks to see if the fpga_aux_ref can output a valid pps signal + + External Equipment: None + + Return dictionary: + """ + assert 'clkaux_fpga_aux_ref' in self.tests_to_run + import uhd + from uhd.usrp import multi_usrp + + try: + mcr = self.check_fpga_type_clkaux() + usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync," + "master_clock_rate={}".format(str(mcr))) + except Exception as ex: + return False, { + 'error_msg': "Failed to create usrp device: {}".format(str(ex)) + } + + mpm_c = usrp_dev.get_mpm_client() + + count = mpm_c.get_fpga_aux_ref_freq() + + # We expect a pps signal, pulse is reported in 40 MHz clock ticks, so + # 1 PPS is expected to return 40 million ticks, this gives 1% tolerance + return 39600000 < count < 40400000, {} + + def bist_nsync_rpll_config(self): + """ + BIST for testing that the LMK28PRIRefClk can be used as a source for the + motherboard RPLL + + Description: Enable the LMK on the clocking auxiliary board to output a signal + on the LMK28PRIRefClk line to the motherboard RPLL. Configure the RPLL to use + this source, and check to see if the RPLL can lock to the source. + + External Equipment: None + + Return dictionary: + """ + assert 'nsync_rpll_config' in self.tests_to_run + import uhd + from uhd.usrp import multi_usrp + from usrp_mpm.sys_utils import net + + try: + mcr = self.check_fpga_type_clkaux() + usrp_dev = multi_usrp.MultiUSRP("type=x4xx,addr=localhost,clock_source=nsync," + "master_clock_rate={}".format(str(mcr))) + except Exception as ex: + return False, { + 'error_msg': "Failed to create usrp device: {}".format(str(ex)) + } + + sfp0_addrs = net.get_iface_info('sfp0')['ip_addrs'] + + mpm_c = usrp_dev.get_mpm_client() + + mpm_c.config_rpll_to_nsync() + + ref_locked = mpm_c.get_ref_lock_sensor() + + result = ref_locked.get('value') == 'true' + + fpga_type = mpm_c.get_device_info()['fpga'] + + del usrp_dev + + # This is a brute-force way of making sure the device gets back into a clean state after + # we touched the rpll configuration + self.reload_fpga(fpga_type, sfp0_addrs) + + return result, ref_locked + + def bist_gpio(self): + """ + BIST for GPIO + Description: Writes and reads the values to the GPIO + + Needed Equipment: External loopback cable between port 0 and port 1 + + Notes: + - X410 has two FP-GPIO connectors (HDMI connectors) with 12 programmable + pins each + + Return dictionary: + - write_patterns: A list of patterns that were written + - read_patterns: A list of patterns that were read back + """ + assert 'gpio' in self.tests_to_run + if self.args.dry_run: + patterns = range(64) + return True, { + 'write_patterns': list(patterns), + 'read_patterns': list(patterns), + } + from usrp_mpm.periph_manager import x4xx, x4xx_periphs, x4xx_mb_cpld + mboard_regs_control = x4xx_periphs.MboardRegsControl( + x4xx.x4xx.mboard_regs_label, self.log) + cpld_spi_node = dt_symbol_get_spidev('mb_cpld') + cpld_control = x4xx_mb_cpld.MboardCPLD(cpld_spi_node, self.log) + gpio_diocontrol = x4xx_periphs.DioControl( + mboard_regs_control, + cpld_control, + self.log) + def _run_sub_test(inport, outport, pin_mode, voltage, pattern): + """ + Closure to run an actual test. The GPIO control object is enclosed. + + Arguments: + inport: "port" argument for DioControl, input port + outport: "port" argument for DioControl, input port + pin_mode: HDMI or DIO (see DioControl) + voltage: Valid arg for DioControl.set_voltage_level() + pattern: Bits to write to the inport, should be read back at outport + """ + gpio_diocontrol.set_port_mapping(pin_mode) + # We set all pins to be driven by the PS + # in HDMI mode not all pins can be accessed by the user + if pin_mode == "HDMI": + mask = 0xDB6D + else: + mask = 0xFFF + gpio_diocontrol.set_pin_masters(inport, mask) + gpio_diocontrol.set_pin_masters(outport, mask) + gpio_diocontrol.set_voltage_level(inport, voltage) + gpio_diocontrol.set_voltage_level(outport, voltage) + gpio_diocontrol.set_pin_directions(inport, 0x00000) + gpio_diocontrol.set_pin_directions(outport, 0xFFFFF) + gpio_diocontrol.set_pin_outputs(outport, pattern) + read_values = gpio_diocontrol.get_pin_inputs(inport) + if (pattern & mask) != read_values: + sys.stderr.write(gpio_diocontrol.status()) + return False, {'write_patterns': ["0x{:04X}".format(pattern)], + 'read_patterns': ["0x{:04X}".format(read_values)]} + return True, {'write_patterns': ["0x{:04X}".format(pattern)], + 'read_patterns': ["0x{:04X}".format(read_values)]} + # Now run tests: + for voltage in ["1V8", "2V5", "3V3"]: + for mode in ["DIO", "HDMI"]: + for pattern in [0xFFFF, 0xA5A5, 0x5A5A, 0x0000]: + sys.stderr.write("test: PortA -> PortB, {}, {}, 0x{:04X}" + .format(voltage, mode, pattern)) + status, data = _run_sub_test( + "PORTB", "PORTA", mode, voltage, pattern) + if not status: + return status, data + sys.stderr.write("test: PortB -> PortA, {}, {}, 0x{:04X}" + .format(voltage, mode, pattern)) + status, data = _run_sub_test( + "PORTA", "PORTB", mode, voltage, pattern) + if not status: + return status, data + return status, data + + + def bist_qsfp(self): + """ + BiST for QSFP status and property read out. + + Description: Tests MODSEL (write) and MODPRS (read) pin at QSFP port + and I2C communication with QSFP module. By default all + ports are tested. The user can add module to the option + argument when calling the test to select a specific + port out of [0,1]. A negative number will test all + ports (the default) + + Example: The following example will run the test on port 0: + > x4xx_bist qsfp --option module=0 + + Needed Equipment: None, for exhaustive test results the test should + be run with loopback test modules. + + Notes: The test ensures consistency of I2C communication and the + MODSEL and MODPRS pins. If MODPRS pin is active low + (module present) I2C communication must return valid values for + all QSFP properties. On the other hand when MODPRS pin is high + QSFP properties should report None as the only valid value. + The test also disables the I2C communication (unsetting MODSEL + pin) and checks that the low level I2C communication with the + module fails. The communication is reenabled after 3sec. This + window allows a tester to check whether a loopback adapter + signals the state of MODSEL correctly. + """ + + assert 'qsfp' in self.tests_to_run + if self.args.dry_run: + return True, {} + + from usrp_mpm.periph_manager import x4xx + from usrp_mpm.periph_manager.x4xx_periphs import QSFPModule + + def add_error_msg(msg): + # add error message to result map + if "error_msg" not in infos: + infos["error_msg"] = [] + infos["error_msg"].append(msg) + + def modules_to_test(): + module = self.args.option.get("module", -1) + try: + module = int(module) + except ValueError: + add_error_msg("Module '{}' is not an int".format(module)) + return None + if module < 0: + return x4xx.X400_QSFP_I2C_CONFIGS + if module not in [0, 1]: + add_error_msg("Module to test must be 0 or 1") + return None + return [x4xx.X400_QSFP_I2C_CONFIGS[module]] + + + infos = {} + modules_to_test = modules_to_test() + if not modules_to_test: + return False, infos + + result = True + + for config in modules_to_test: + qsfp = QSFPModule( + config.modprs, config.modsel, config.devsymbol, self.log) + info = {} + info["available"] = qsfp.is_available() + if info["available"]: + # if adapter is available, read out prop via I2C and + # verify they are all valid (not None) + props = [qsfp.decoded_status, + qsfp.vendor_name, + qsfp.connector_type] + for prop in props: + info[prop.__name__] = prop() + if not all(info.values()): + result = False + add_error_msg("QSFP adapter at {} is present but has " + "None property.".format(config.devsymbol)) + else: + # if adapter is not available + # reading a property *must* return None + if qsfp.connector_type() is not None: + result = False + add_error_msg("QSFP adapter at {} reports valid property " + "but is not present.".format(config.devsymbol)) + + infos[config.devsymbol] = info + + # disable i2c communication and check for failure on low level read + qsfp.enable_i2c(False) + try: + qsfp.qsfp_regs.peek8(0) + result = False + add_error_msg("Reading I2C when disabled should fail with " + "RuntimeError at {}\n".format(config.devsymbol)) + except RuntimeError: + pass + sys.stderr.write("A loopback QSFP adapter at {} should report MODSEL " + "inactive for 3 sec.".format(config.devsymbol)) + time.sleep(3) + qsfp.enable_i2c(True) + + return result, infos + + + def bist_temp(self): + """ + BIST for temperature sensors + Description: Reads the temperature sensors on the motherboards and + returns their values in mC + + Return dictionary: + - : temp in mC + """ + assert 'temp' in self.tests_to_run + if self.args.dry_run: + return True, {"DRAM PCB": 40000, + "EC Internal": 41000, + "PMBUS-0": 42000, + "PMBUS-1": 43000, + "Power Supply PCB": 44000, + "RFSoC": 45000, + "Sample Clock PCB": 46000, + "TMP464 Internal": 47000, + } + + result = bist.get_iio_temp_sensor_values() + + if len(result) < 1: + result['error_msg'] = "No temperature sensors found!" + + return 'error_msg' not in result, result + + def bist_fan(self): + """ + BIST for fans + Description: Reads the RPM values of the fans on the motherboard + + Return dictionary: + - : Fan speed in RPM + + External Equipment: None + """ + assert 'fan' in self.tests_to_run + if self.args.dry_run: + return True, {'fan0': 10000, 'fan1': 10000} + result = bist.get_ectool_fan_values() + return len(result) == 2, result + + def _db_flash_init(self, db_flash): + """ + Initialize the specified DB Flash and verify + its state + """ + db_flash.init() + if not db_flash.initialized: + raise RuntimeError() + + def _db_flash_deinit(self, db_flash): + """ + De-initialize the specified DB Flash and verify + its state + """ + db_flash.deinit() + if db_flash.initialized: + raise RuntimeError() + + def bist_spi_flash_integrity(self): + """ + BIST for SPI flash on DB + Description: Performs data integrity test on a section of + the flash memory. Stop the MPM service before running this + test using "systemctl stop usrp-hwd" command. + + External Equipment: None, but at least one daughterboard + should be installed in the X410 unit. + + Return dictionary: + + """ + assert 'spi_flash_integrity' in self.tests_to_run + if self.args.dry_run: + return True, {} + import os + from usrp_mpm.sys_utils.db_flash import DBFlash + FIXED_MEMORY_PATTERN = 'fixed' + RANDOM_MEMORY_PATTERN = 'random' + + db_id = int(self.args.option.get('db_id', DEFAULT_DB_ID)) + assert db_id in [0, 1] + + db_flash = DBFlash(db_id, log=None) + + buf_size = 100 + memory_pattern = str(self.args.option.get('memory_pattern', RANDOM_MEMORY_PATTERN)) + assert memory_pattern in (FIXED_MEMORY_PATTERN, RANDOM_MEMORY_PATTERN) + if memory_pattern == RANDOM_MEMORY_PATTERN: + buff = os.urandom(buf_size) + else: + buff = [0xA5] * buf_size + + data_valid = False + + try: + self._db_flash_init(db_flash) + except (ValueError, RuntimeError) as ex: + return False, {"error_msg": "Error while initializing flash storage: " + str(ex)} + + file_path = f'/mnt/db{db_id}_flash/test.bin' + + sys.stderr.write("Testing DB{} with {} memory pattern..".format(db_id, memory_pattern)) + with open(file_path, "wb") as f: + f.write(bytearray(buff)) + + try: + self._db_flash_deinit(db_flash) + self._db_flash_init(db_flash) + except (ValueError, RuntimeError) as ex: + return False, {"error_msg": "Error while init(deinit)ializing flash storage: " + str(ex)} + + with open(file_path, "rb") as f: + read_data = f.read() + data_valid = read_data == bytearray(buff) + os.remove(file_path) + self._db_flash_deinit(db_flash) + + return data_valid, {} + + def bist_spi_flash_speed(self): + """ + BIST for SPI flash on DB + Description: Performs read and write speed test on the SPI flash + memory on DB. Stop the MPM service before running this test + using "systemctl stop usrp-hwd" command. + + External Equipment: None, but at least one daughterboard + should be installed in the X410 unit. + + Return dictionary: + + """ + assert 'spi_flash_speed' in self.tests_to_run + if self.args.dry_run: + return True, {} + import os + import re + from usrp_mpm.sys_utils.db_flash import DBFlash + + MIN_WRITE_SPEED = 5000 #B/s + MIN_READ_SPEED = 5000 #B/s + def parse_speed(cmd_output): + mobj = re.search( + r"(.*records in\n)(.*records out\n)(.* (?P[0-9.]+) (?P\S?)B\/s)", + cmd_output) + if mobj is None: + return 0 + scale = {'': 1, 'k': 1024, 'M': 1024*1024, 'G': 1024*1024*1024} + order = mobj.group('order') + if order not in scale: + raise ValueError(f"unsupported unit '{order}B/s'") + return float(mobj.group('speed')) * scale[order] + + db_id = int(self.args.option.get('db_id', DEFAULT_DB_ID)) + assert db_id in [0, 1] + + db_flash = DBFlash(db_id, log=None) + + file_path = f'/mnt/db{db_id}_flash/test.bin' + + try: + self._db_flash_init(db_flash) + except (ValueError, RuntimeError) as ex: + return False, { + "error_msg": "Error while initializing flash storage: {}".format(str(ex)) + } + + sys.stderr.write("Testing DB{}..".format(db_id)) + write_error_msg = None + sys.stderr.write("Write Speed Test:") + cmd = [ + 'dd', + 'if=/dev/zero', + 'of=' + file_path, + 'bs=512', + 'count=1000', + 'oflag=dsync' + ] + + try: + output = subprocess.check_output( + cmd, + stderr=subprocess.STDOUT, + ) + except subprocess.CalledProcessError as ex: + output = ex.output + write_error_msg = "Error during Write Test: {}".format(output) + write_test_output = output.decode("utf-8") + sys.stderr.write(write_test_output) + + # De-init and init flash here to mitigate any effects + # caching may have on the read speed. + try: + self._db_flash_deinit(db_flash) + self._db_flash_init(db_flash) + except (ValueError, RuntimeError) as ex: + return False, { + "error_msg": "Error while init(deinit)ializing flash storage: {}".format(str(ex))} + + sys.stderr.write("Read Speed Test:") + read_error_msg = None + cmd = [ + 'dd', + 'if=' + file_path, + 'of=/dev/null', + 'bs=512', + 'count=1000', + 'oflag=dsync' + ] + + try: + output = subprocess.check_output( + cmd, + stderr=subprocess.STDOUT, + ) + except subprocess.CalledProcessError as ex: + output = ex.output + read_error_msg = "Error during Read Test: {}".format(output) + read_test_output = output.decode("utf-8") + sys.stderr.write(read_test_output) + + # Clean up. + os.remove(file_path) + self._db_flash_deinit(db_flash) + + test_status = bool(write_error_msg is None and read_error_msg is None) + + if test_status: + write_speed = parse_speed(write_test_output) + read_speed = parse_speed(read_test_output) + + if write_speed == 0: + test_status = False + write_error_msg = "Write speed parse error" + elif write_speed < MIN_WRITE_SPEED: + test_status = False + write_error_msg = \ + "Write speed {} B/s is below minimum requirement of {} B/s".format( + write_speed, MIN_WRITE_SPEED) + + if read_speed == 0: + test_status = False + read_error_msg = "Read speed parse error" + elif read_speed < MIN_READ_SPEED: + test_status = False + read_error_msg = \ + "Read speed {} B/s is below minimum requirement of {} B/s".format( + read_speed, MIN_READ_SPEED) + + return test_status, { + "Write Test": write_test_output if write_error_msg is None else write_error_msg, + "Read Test": read_test_output if read_error_msg is None else read_error_msg + } + + +############################################################################## +# main +############################################################################## +def main(): + " Go, go, go! " + return X4XXBIST().run() + +if __name__ == '__main__': + sys.exit(not main()) -- cgit v1.2.3