#!/usr/bin/env python3 # # Copyright 2018 Ettus Research, a National Instruments Company # # SPDX-License-Identifier: GPL-3.0-or-later # """ Test all python API functions for the connected device. """ import sys import argparse import numpy import uhd try: from ruamel import yaml except: import yaml # pylint: disable=broad-except class MethodExecutor(object): """ Object for executing tests. Handles returned errors and mantains tested & failed lists. """ def __init__(self): self.tested = [] self.failed = [] def execute_methods(self, method_names, method_callback): """ Execute API methods in 'method_names' by calling 'method_callback'. The callback must throw an exception if something went wrong. If no exception is thrown, the assumption is that all went fine. method_names -- List of methods that will be called. method_callback -- String containing the method to call. """ self.tested.extend(method_names) method_names_str = ", ".join([method + "()" for method in method_names]) print(f"Executing methods: {method_names_str}") try: method_callback() except Exception as ex: print(f"Error while executing methods: \n`{method_names_str}`:\n{ex}", file=sys.stderr) self.failed.extend(method_names) return False return True def chan_test(usrp, prop, num_chans, error_handling, get_range, arg_converter=None): """ Test methods that take channel number as input. usrp -- Device object to run tests on. prop -- String of function to be tested. num_chans -- Integer for number of channels. error_handling -- coerce or throw, depending on expected results. get_range -- String for the get_range function. arg_converter -- String for type to convert values to. """ getter = 'get_{}'.format(prop) setter = 'set_{}'.format(prop) for chan in range(num_chans): initial_value = getattr(usrp, getter)(chan) # Test value below, above, and within range # If a get_range function is passed in: try: prop_range = getattr(usrp, get_range)(chan) min_val = prop_range.start() - 10 max_val = prop_range.stop() + 10 mid_point = prop_range.clip((min_val + max_val) / 2, True) # get_range isn't a function, its a list. except TypeError: min_val = get_range[0] - 10 max_val = get_range[1] + 10 mid_point = (get_range[0] + get_range[1]) / 2 # Values must be converted to TuneRequest type for these functions. arg_converter = arg_converter if arg_converter is not None \ else lambda x: x min_val = arg_converter(min_val) max_val = arg_converter(max_val) # If setter is expected to throw errors, we set some invalid values and # verify we get an exception: if error_handling == 'throw': try: getattr(usrp, setter)(min_val, chan) except RuntimeError: pass else: raise Exception('error found in min test of ', prop) try: getattr(usrp, setter)(max_val, chan) except RuntimeError: pass else: raise Exception('error found in max test of ', prop) # If setter implements error coercion, then we should be able to set # invalid values without an exception occurring: elif error_handling == 'coerce': getattr(usrp, setter)(min_val, chan) getattr(usrp, setter)(max_val, chan) # Set acceptable value. getattr(usrp, setter)(arg_converter(mid_point), chan) # Check if the actual value is within range of set value get_value = float(getattr(usrp, getter)(chan)) if not numpy.isclose(get_value, mid_point, 0.005): raise Exception( f'Error found in setting acceptable value in {prop}.\n' f'Expected {mid_point}, got {get_value}.') # Put it back the way we found it getattr(usrp, setter)(initial_value, chan) return True def lo_name_test(usrp, prop, num_chans, get_range): """ Test methods that have an lo_name string as an argument. usrp -- Device object to run tests on. prop -- String of function to be tested. num_chans -- Integer for number of channels. get_range -- String for the get_range function. """ getter = 'get_{}'.format(prop) setter = 'set_{}'.format(prop) # For each channel, get lo_names. for chan in range(num_chans): if prop == 'tx_lo_freq': lo_names = getattr(usrp, 'get_tx_lo_names')(chan) else: lo_names = getattr(usrp, 'get_rx_lo_names')(chan) # For each lo_name, set a value below minimum, # above maximum, and within range. for lo_name in lo_names: initial_value = getattr(usrp, getter)(lo_name, chan) prop_range = getattr(usrp, get_range)(lo_name, chan) min_val = prop_range.start() max_val = prop_range.stop() mid_point = \ prop_range.clip( (prop_range.start() + prop_range.stop()) / 2, True) try: getattr(usrp, setter)(min_val, lo_name, chan) except RuntimeError: raise Exception('error found in min test of ', prop) try: getattr(usrp, setter)(max_val, lo_name, chan) except RuntimeError: raise Exception('error found in max test of ', prop) getattr(usrp, setter)(mid_point, lo_name, chan) # Check if the actual value is within range of set value get_value = float(getattr(usrp, getter)(lo_name, chan)) if not numpy.isclose(mid_point, get_value, 0.005): raise Exception( f'Error found in setting acceptable value in {prop} ' f'for LO {lo_name} on channel {chan}.\n' f'Expected value: {mid_point} Actual: {get_value}') # Put it back the way we found it getattr(usrp, setter)(initial_value, lo_name, chan) return True def range_test(usrp, prop, num_chans, error_handling=None, args_type='chan', arg_converter=None, get_range=None): """ Function to perform range_tests using getrange, getters, and setters. usrp -- Device object to run tests on. prop -- String of function to be tested. num_chans -- Integer for number of channels. error_handling -- coerce or throw, depending on expected results. args_type -- The type of argument that must be passed into the function. Possible values: 'chan' means the argument is a channel number. 'lo_name' means it is an LO name. arg_converter -- String for type to convert values to. get_range -- String for get_range function """ assert error_handling in ('coerce', 'throw') assert args_type in ('chan', 'lo_name') if get_range is None: get_range = 'get_{}_range'.format(prop) if args_type == 'chan': to_ret = chan_test(usrp, prop, num_chans, error_handling, get_range, arg_converter) else: # args_type == 'lo_name': to_ret = lo_name_test(usrp, prop, num_chans, get_range) return to_ret def discrete_options_test(usrp, prop, num_chans, error_handling=None): """ Function to perform tests on methods that return list of possible discrete values (floats). usrp -- Device object to run tests on. prop -- String of function to be tested. num_chans -- Integer for number of channels. error_handling -- coercer or throw, depending on expected results. """ assert error_handling in ('coerce', 'throw') get_range = 'get_{}s'.format(prop) # Generate all possible set values return chan_test(usrp, prop, num_chans, error_handling, get_range) def list_test(usrp, prop, error_handling, post_hook=None, safe_values=None): """ Function to perform tests on methods that return lists of possible discrete values (strings). usrp -- Device object to run tests on. prop -- String of function to be tested. error_handling -- coercer or throw, depending on expected results. post_hook -- Callback to call unconditionally after executing test (e.g. to reset something) safe_values -- A list of safe values that can be tested from the range. """ assert error_handling in ('coerce', 'throw') # The following functions have different get_range functions. if 'gain_profile' in prop: get_range = 'get_{}_names'.format(prop) else: get_range = 'get_{}s'.format(prop) getter = 'get_{}'.format(prop) setter = 'set_{}'.format(prop) # lo_source function does not take int as argument names = getattr(usrp, get_range)(0) if 'lo_source' not in prop \ else getattr(usrp, get_range)() # Try to set every possible value. for name in names: if safe_values and name not in safe_values: print(f"Skipping value `{name}' for prop {prop}, considered unsafe.") continue initial_value = getattr(usrp, getter)(0) try: getattr(usrp, setter)(name) except RuntimeError as ex: raise Exception('error found in setting {} to {} => {}' .format(prop, name, str(ex))) # Check if get function returns set value. get_value = getattr(usrp, getter)(0) if get_value != name: raise Exception('Error in setting acceptable value in {}' .format(prop)) getattr(usrp, setter)(initial_value) if post_hook: post_hook() return True def get_test(usrp, prop, num_chans): """ For testing only get methods that have no set counterpart. usrp -- Device object to run tests on. prop -- String of function to be tested. num_chans -- Integer for number of channels. """ getter = 'get_{}'.format(prop) for chan in range(num_chans): # Refresh dictionary for each channel dictionary = getattr(usrp, "get_usrp_rx_info")(chan) if "rx" in prop \ else getattr(usrp, "get_usrp_tx_info")(chan) value = getattr(usrp, getter)(chan) if prop != 'mboard_name' \ else getattr(usrp, getter)(0) # get_mboard_name function maps to mboard_id field in dict. dict_value = dictionary['mboard_id'] if prop == 'mboard_name' \ else dictionary[prop] if value != dict_value: raise Exception("value in dict is {} and callback value is {}" .format(dictionary[prop], value)) return True def recv_num_samps(usrp): """ Test recv_num_samps method. usrp -- Device object to run tests on. """ num_samps = 1000 rate = getattr(usrp, "get_rx_rate")() samples = getattr(usrp, "recv_num_samps")(num_samps, rate) # Check that number of samples returned is correct and no sample is 0. if len(samples[0]) != num_samps: raise Exception("Number of samples received is not number requested.") return True def send_waveform(usrp): """ Test send_waveform method. usrp -- Device object to run tests on. """ rate = getattr(usrp, "get_rx_rate")() usrp.send_waveform(numpy.asarray([1, 0, 1, 0], dtype=numpy.complex64), 5, rate) return True def test_sensor_api(usrp, sensor_type, num_indices): """ Test the sensor API. It consists of two API calls ("get_sensor_names" and "get_sensor"). It will read out all the sensors and print their values. This will work for the mboard and TX/RX sensors. usrp -- Device object to run tests on. sensor_type -- String containing "RX" or "TX" num_indices -- Integer denoting the number of sensors. """ list_sensor_method = 'get_{}_sensor_names'.format(sensor_type) get_sensor_method = 'get_{}_sensor'.format(sensor_type) for sensor_index in range(num_indices): sensor_names = getattr(usrp, list_sensor_method)(sensor_index) for sensor_name in sensor_names: print("Reading sensor: {sensor_name} ({sensor_index}/{num_indices}): " .format( sensor_name=sensor_name, sensor_index=sensor_index, num_indices=num_indices, )) sensor_value = getattr(usrp, get_sensor_method)( sensor_name, sensor_index ) print(str(sensor_value)) def test_time(usrp, prop): """ Perform tests on functions that take time as an argument. usrp -- Device object to run tests on. prop -- String of function to be tested. """ time = getattr(usrp, 'get_time_last_pps')() time = time + 10 getattr(usrp, prop)(time) return True def set_subdev_spec(usrp, prop): """ Function for performing test on set_subdev_spec methods. usrp -- Device object to run tests on. prop -- String of function to be tested. """ spec = getattr(usrp, 'get_rx_subdev_spec')() if prop == \ 'set_rx_subdev_spec' else getattr(usrp, 'get_tx_subdev_spec')() getattr(usrp, prop)(spec) return True def mboard_range_test(usrp, prop, num_mboards): """ Execute tests on methods that need to be tested on a range of motherboards. usrp -- Device object to run tests on. prop -- String of function to be tested. num_mboards -- Integer value of number of motherboards. """ for mboard in range(0, num_mboards): if getattr(usrp, prop)(mboard) is None: raise Exception("{} function with argument {} returns None". format(prop, mboard)) return True def chan_range_test(usrp, prop, num_chans): """ Execute tests on methods that need to be tested on a range of channels. usrp -- Device object to run tests on. prop -- String of function to be tested. num_chans -- Integer value of number of channels. """ if prop in ('set_rx_iq_balance', 'set_tx_iq_balance'): for chan in range(0, num_chans): getattr(usrp, prop)(1, chan) else: # Set rx_dc_offset and tx_dc_offset to 0 for testing purposes. for chan in range(0, num_chans): getattr(usrp, prop)(0, chan) return True def gpio_attr_test(usrp, num_mboards): """ Perform tests for get_gpio_attr and set_gpio_attr. usrp -- Device object to run tests on. num_mboards -- Integer value of number of motherboards. """ for mboard in range(0, num_mboards): banks = usrp.get_gpio_banks(mboard) for bank in banks: value = usrp.get_gpio_attr(bank, 'CTRL') usrp.set_gpio_attr(bank, 'CTRL', value) return True def iq_balance_test(usrp, prop, num_chans): """ Function for testing rx and tx iq_balance functions. usrp -- Device object to run tests on. prop -- String of function to be tested. num_chans -- Integer value of number of channels. """ for chan in range(num_chans): getattr(usrp, prop)(1, chan) return True def filter_test(usrp, prop, num_chans): """ Test specifically for the get_filter function usrp -- Device object to run tests on. prop -- String of function to be tested. """ for chan in range(num_chans): filters = getattr(usrp, 'get_{}_filter_names'.format(prop))(chan) for filter_name in filters: # Read a filter object... filter_obj = getattr(usrp, 'get_{}_filter'.format(prop))(filter_name, chan) if filter_obj is None: raise Exception("Filter object for {} returns None" .format(filter_name)) # ... and write it back: try: getattr(usrp, 'set_{}_filter'.format(prop))(filter_name, filter_obj, chan) except RuntimeError as ex: if "can not be written" not in str(ex): raise return True def tree_test(usrp): """ Test prop tree access """ tree = usrp.get_tree() name = tree.access_str("/name").get() print("Property tree got name: " + name) return True def power_test(usrp, direction, num_chans): """ Test the power reference API. If we don't have a power cal API available, then we check it fails. """ has_cb = getattr(usrp, f'has_{direction}_power_reference') get_range_cb = getattr(usrp, f'get_{direction}_power_range') get_cb = getattr(usrp, f'get_{direction}_power_reference') set_cb = getattr(usrp, f'set_{direction}_power_reference') for chan in range(num_chans): # pylint: disable=bare-except if not has_cb(chan): try: get_range_cb(chan) except: pass else: raise(f"get_{direction}_power_range({chan}): " "Expected exception (no power cal), none occurred.") try: get_cb(chan) except: pass else: raise(f"get_{direction}_power_reference({chan}): " "Expected exception (no power cal), none occurred.") try: set_cb(100, chan) except: pass else: raise(f"set_{direction}_power_reference({chan}): " "Expected exception (no power cal), none occurred.") continue # pylint: enable=bare-except # Now check power API for reals: initial_value = getattr(usrp, f'get_{direction}_gain')(chan) # Test value below, above, and within range prop_range = get_range_cb(chan) min_val = prop_range.start() - 10 max_val = prop_range.stop() + 10 mid_point = prop_range.clip((min_val + max_val) / 2, True) # These should not throw set_cb(min_val, chan) set_cb(max_val, chan) # Set acceptable value in the middle set_cb(mid_point, chan) # Check if the actual value is within range of set value actual_value = get_cb(chan) # We'll allow a pretty big variance, the power coercion has a lot of # constraints if not numpy.isclose(actual_value, mid_point, 10): raise Exception( f'Error found in setting midpoint power value for {direction}.\n' f'Expected {mid_point}, got {actual_value}.') # Put it back the way we found it getattr(usrp, f'set_{direction}_gain')(initial_value, chan) return True def run_api_test(usrp, device_config): """ Name functions to be tested. usrp -- device object to run tests on device_config -- Dictionary that contains further configuration for various tests. """ num_rx_chans = device_config.get('num_rx_channels', usrp.get_rx_num_channels()) num_tx_chans = device_config.get('num_tx_channels', usrp.get_tx_num_channels()) num_mboards = usrp.get_num_mboards() method_executor = MethodExecutor() # Append functions already called or will be called implicitly. method_executor.tested.extend(( 'make', 'issue_stream_cmd', # Gets called by recv_num_samps 'get_rx_num_channels', # Got called above 'get_tx_num_channels', # Got called above 'get_usrp_rx_info', 'get_usrp_tx_info', 'get_num_mboards', # Got called above 'get_rx_stream', 'get_tx_stream', # Require stream_args_t )) method_executor.tested.extend(device_config.get('imply_tested', [])) actual_tests = [ (['get_rx_freq', 'set_rx_freq', 'get_rx_freq_range'], lambda: range_test(usrp, 'rx_freq', num_rx_chans, 'coerce', arg_converter=uhd.types.TuneRequest)), (['get_rx_gain', 'set_rx_gain', 'get_rx_gain_range'], lambda: range_test(usrp, 'rx_gain', num_rx_chans, 'coerce')), (['get_rx_bandwidth', 'set_rx_bandwidth', 'get_rx_bandwidth_range'], lambda: range_test(usrp, 'rx_bandwidth', num_rx_chans, 'coerce')), (['get_rx_lo_freq', 'set_rx_lo_freq', 'get_rx_lo_freq_range'], lambda: range_test(usrp, 'rx_lo_freq', num_rx_chans, 'coerce', args_type='lo_name')), (['get_rx_rate', 'set_rx_rate', 'get_rx_rates'], lambda: discrete_options_test(usrp, 'rx_rate', num_rx_chans, 'coerce')), (['get_time_source', 'set_time_source', 'get_time_sources'], lambda: list_test(usrp, 'time_source', 'coerce', safe_values=device_config.get('time_sources', ('internal')))), (['get_clock_source', 'set_clock_source', 'get_clock_sources'], lambda: list_test(usrp, 'clock_source', 'coerce', safe_values=device_config.get('clock_sources', ('internal')))), (['get_sync_source', 'set_sync_source', 'get_sync_sources'], lambda: list_test( usrp, 'clock_source', 'coerce', safe_values=device_config.get('sync_sources', ({ 'clock_source': 'internal', 'time_source': 'internal', })))), (['get_rx_antenna', 'set_rx_antenna', 'get_rx_antennas'], lambda: list_test(usrp, 'rx_antenna', 'coerce')), (['get_tx_antenna', 'set_tx_antenna', 'get_tx_antennas'], lambda: list_test(usrp, 'tx_antenna', 'coerce')), (['get_rx_lo_source', 'set_rx_lo_source', 'get_rx_lo_source_names'], lambda: list_test(usrp, 'rx_lo_source', error_handling='throw')), (['get_normalized_rx_gain', 'set_normalized_rx_gain'], lambda: range_test(usrp, 'normalized_rx_gain', num_rx_chans, error_handling='throw', get_range=[0, 1])), (['get_rx_subdev_name'], lambda: get_test(usrp, 'rx_subdev_name', num_rx_chans)), (['get_rx_subdev_spec'], usrp.get_rx_subdev_spec), (['get_tx_freq', 'set_tx_freq', 'get_tx_freq_range'], lambda: range_test(usrp, 'tx_freq', num_tx_chans, 'coerce', arg_converter=uhd.types.TuneRequest)), (['get_tx_gain', 'set_tx_gain', 'get_tx_gain_range'], lambda: range_test(usrp, 'tx_gain', num_tx_chans, 'coerce')), (['get_tx_bandwidth', 'set_tx_bandwidth', 'get_tx_bandwidth_range'], lambda: range_test(usrp, 'tx_bandwidth', num_tx_chans, 'coerce')), (['get_tx_lo_freq', 'set_tx_lo_freq', 'get_tx_lo_freq_range'], lambda: range_test(usrp, 'tx_lo_freq', num_tx_chans, 'coerce', 'lo_name')), (['get_tx_rate', 'set_tx_rate', 'get_tx_rates'], lambda: discrete_options_test(usrp, 'tx_rate', num_rx_chans, 'coerce')), (['get_tx_lo_source', 'set_tx_lo_source', 'get_tx_lo_names'], lambda: list_test(usrp, 'tx_lo_source', 'coerce')), (['get_normalized_tx_gain', 'set_normalized_tx_gain'], lambda: range_test(usrp, 'normalized_tx_gain', num_rx_chans, error_handling='throw', get_range=[0, 1])), (['get_tx_subdev_name'], lambda: get_test(usrp, "tx_subdev_name", num_tx_chans)), (['get_tx_subdev_spec'], usrp.get_tx_subdev_spec), (['get_mboard_name'], lambda: get_test(usrp, 'mboard_name', num_tx_chans)), (['recv_num_samps'], lambda: recv_num_samps(usrp)), (['send_waveform'], lambda: send_waveform(usrp)), (['get_tx_gain_profile', 'set_tx_gain_profile', 'get_tx_gain_profile_names'], lambda: list_test(usrp, 'tx_gain_profile', 'coerce')), (['get_rx_gain_profile', 'set_rx_gain_profile', 'get_rx_gain_profile_names'], lambda: list_test(usrp, 'rx_gain_profile', 'coerce')), (['get_master_clock_rate', 'set_master_clock_rate', 'get_master_clock_rate_range'], lambda: range_test(usrp, 'master_clock_rate', 1, 'coerce')), (['get_mboard_sensor_names', 'get_mboard_sensor'], lambda: test_sensor_api(usrp, 'mboard', num_mboards)), (['get_tx_sensor_names', 'get_tx_sensor'], lambda: test_sensor_api(usrp, 'tx', num_tx_chans)), (['get_rx_sensor_names', 'get_rx_sensor'], lambda: test_sensor_api(usrp, 'rx', num_rx_chans)), (['set_time_next_pps'], lambda: test_time(usrp, "set_time_next_pps")), (['set_time_now'], lambda: test_time(usrp, "set_time_now")), (['set_rx_subdev_spec'], lambda: set_subdev_spec(usrp, "set_rx_subdev_spec")), (['set_tx_subdev_spec'], lambda: set_subdev_spec(usrp, "set_tx_subdev_spec")), (['get_gpio_banks'], lambda: mboard_range_test(usrp, "get_gpio_banks", num_mboards)), (['get_time_now'], lambda: mboard_range_test(usrp, "get_time_now", num_mboards)), (['get_time_last_pps'], lambda: mboard_range_test(usrp, "get_time_last_pps", num_mboards)), (['set_rx_dc_offset'], lambda: chan_range_test(usrp, "set_rx_dc_offset", num_rx_chans)), (['set_tx_dc_offset'], lambda: chan_range_test(usrp, "set_tx_dc_offset", num_tx_chans)), (['set_rx_agc'], lambda: chan_range_test(usrp, "set_rx_agc", num_rx_chans)), (['get_gpio_attr', 'set_gpio_attr', 'get_gpio_banks'], lambda: gpio_attr_test(usrp, num_mboards)), (['get_fe_rx_freq_range'], usrp.get_fe_rx_freq_range), (['get_fe_tx_freq_range'], usrp.get_fe_tx_freq_range), (['get_pp_string'], usrp.get_pp_string), (['get_rx_gain_names'], usrp.get_rx_gain_names), (['get_rx_lo_export_enabled'], usrp.get_rx_lo_export_enabled), (['get_rx_lo_names'], usrp.get_rx_lo_names), (['get_rx_lo_sources'], usrp.get_rx_lo_sources), (['get_time_synchronized'], usrp.get_time_synchronized), (['get_tx_gain_names'], usrp.get_tx_gain_names), (['get_tx_lo_export_enabled'], usrp.get_tx_lo_export_enabled), (['get_tx_lo_sources'], usrp.get_tx_lo_sources), (['set_rx_iq_balance'], lambda: iq_balance_test(usrp, "set_rx_iq_balance", num_rx_chans)), (['set_tx_iq_balance'], lambda: iq_balance_test(usrp, "set_tx_iq_balance", num_tx_chans)), (['get_rx_filter', 'set_rx_filter', 'get_rx_filter_names'], lambda: filter_test(usrp, "rx", num_rx_chans)), (['get_tx_filter', 'set_tx_filter', 'get_tx_filter_names'], lambda: filter_test(usrp, "tx", num_rx_chans)), (['clear_command_time'], usrp.clear_command_time), (['get_tree'], lambda: tree_test(usrp)), (['has_rx_power_reference', 'set_rx_power_reference', 'get_rx_power_reference', 'get_rx_power_range', ], lambda: power_test(usrp, 'rx', num_rx_chans), ), (['has_tx_power_reference', 'set_tx_power_reference', 'get_tx_power_reference', 'get_tx_power_range', ], lambda: power_test(usrp, 'rx', num_rx_chans), ), ] # List of tests that we don't test, but that's OK. white_listed = ['set_rx_lo_export_enabled', # Not supported w/all devices. 'set_tx_lo_export_enabled', # Not supported w/all devices. 'set_time_source_out', # Not supported on all devices. 'get_register_info', # Requires path to register # Need register path, but enumerate_registers returns None 'read_register', 'write_register', 'set_command_time', # Time_spec_t required 'set_filter', # Requires passing sptr into function 'set_user_register', # Not always callable 'set_clock_source_out', 'get_tx_dboard_iface', 'get_rx_dboard_iface', 'set_time_unknown_pps', 'get_radio_control', 'get_mb_controller', 'get_mpm_client', ] blacklist = device_config.get('skip', []) success = True # ...then we can run them all through the executor like this: for method_names, test_callback in actual_tests: if any(method in blacklist for method in method_names): continue if not method_executor.execute_methods(method_names, test_callback): success = False untested = [ test + (" [blacklisted]" if test in blacklist else "") for test in dir(usrp) if test not in method_executor.tested \ and test not in white_listed \ and not test.startswith('_') ] if method_executor.failed: print("The following API calls caused failures:") print("* " + "\n* ".join(method_executor.failed)) if untested: print("The following functions were not tested:") print("* " + "\n* ".join(untested)) return success def parse_args(): """ Parse args. """ parser = argparse.ArgumentParser() parser.add_argument( '--args', default='', ) parser.add_argument( '--dump-defaults', help="Specify a device type, and the default config will be dumped as YAML" ) parser.add_argument( '--device-config', help="Specify path to YAML file to use as device config" ) return parser.parse_args() def get_device_config(usrp_type, device_config_path=None): """ Return a device configuration object. """ if device_config_path: with open(device_config_path, 'r') as yaml_f: return yaml.load(yaml_f) if usrp_type in ('B205mini', 'B200mini', 'B200', 'B210'): return { 'skip': [ 'set_rx_lo_export_enabled', 'set_tx_lo_export_enabled', 'get_rx_lo_source', 'set_rx_lo_source', 'get_rx_lo_source_names', 'get_tx_lo_source', 'set_tx_lo_source', 'get_tx_lo_names', 'get_master_clock_rate', 'set_master_clock_rate', 'get_master_clock_rate_range', 'get_gpio_attr', 'set_gpio_attr', 'get_gpio_banks', ], } if usrp_type == 'x410': return { 'skip': [ # No AGC on ZBX 'set_rx_agc', # No IQ imbalance on ZBX 'set_rx_iq_balance', 'set_tx_iq_balance', # No DC offset on ZBX 'set_rx_dc_offset', 'set_tx_dc_offset', # No LO source control on ZBX 'set_rx_lo_source', 'set_tx_lo_source', 'set_rx_lo_export_enabled', 'set_tx_lo_export_enabled', ], 'clock_sources': ['internal', 'mboard'], } return {} def dump_defaults(usrp_type): """ Print the hard-coded defaults as YAML """ defaults = get_device_config(usrp_type) print(yaml.dump(defaults, default_flow_style=False)) def main(): """ Returns True on Success """ args = parse_args() if args.dump_defaults: dump_defaults(args.dump_defaults) return 0 usrp = uhd.usrp.MultiUSRP(args.args) usrp_type = usrp.get_usrp_rx_info().get('mboard_id') device_config = get_device_config(usrp_type, args.device_config) ret_val = run_api_test(usrp, device_config) if ret_val != 1: raise Exception("Python API Tester Received Errors") return ret_val if __name__ == "__main__": sys.exit(not main())