diff options
author | Martin Braun <martin.braun@ettus.com> | 2021-05-07 09:47:15 +0200 |
---|---|---|
committer | Aaron Rossetto <aaron.rossetto@ni.com> | 2021-05-10 15:02:24 -0500 |
commit | 147527a75d71098a56945dc3fe751016f79f4a86 (patch) | |
tree | 9efe0d8c62cf6472488710f89907977a1e59efc7 /host | |
parent | 8a33db6022e56b3a94033695fe5c2da3b476a7e2 (diff) | |
download | uhd-147527a75d71098a56945dc3fe751016f79f4a86.tar.gz uhd-147527a75d71098a56945dc3fe751016f79f4a86.tar.bz2 uhd-147527a75d71098a56945dc3fe751016f79f4a86.zip |
tests: Update multi_usrp_test.py
Many updates to this test. Most tests weren't even working properly.
Highlights:
- Add device-specific configuration. Includes defaults, but also the
option to specify a YAML file.
- Improved output for better readability
- Made a whole bunch of tests work
Diffstat (limited to 'host')
-rwxr-xr-x | host/tests/devtest/multi_usrp_test.py | 374 |
1 files changed, 259 insertions, 115 deletions
diff --git a/host/tests/devtest/multi_usrp_test.py b/host/tests/devtest/multi_usrp_test.py index ba17337a6..b9add8381 100755 --- a/host/tests/devtest/multi_usrp_test.py +++ b/host/tests/devtest/multi_usrp_test.py @@ -5,11 +5,17 @@ # SPDX-License-Identifier: GPL-3.0-or-later # """ Test all python API functions for the connected device. """ + import sys import argparse import numpy import uhd +try: + from ruamel import yaml +except: + import yaml +# pylint: disable=broad-except class MethodExecutor(object): """ @@ -31,20 +37,19 @@ class MethodExecutor(object): method_callback -- String containing the method to call. """ self.tested.extend(method_names) - print("Executing {}".format(method_names)) + method_names_str = ", ".join([method + "()" for method in method_names]) + print(f"Executing methods: {method_names_str}") try: - method_callback() is True + method_callback() except Exception as ex: - print("Error while executing `{}`: {}".format( - method_names, str(ex) - ), file=sys.stderr) + print(f"Error while executing methods: \n`{method_names_str}`:\n{ex}", + file=sys.stderr) self.failed.extend(method_names) return False return True -def chan_test(usrp, prop, num_chans, error_handling, get_range, - arg_converter=None): +def chan_test(usrp, prop, num_chans, error_handling, get_range, arg_converter=None): """ Test methods that take channel number as input. usrp -- Device object to run tests on. @@ -57,30 +62,28 @@ def chan_test(usrp, prop, num_chans, error_handling, get_range, getter = 'get_{}'.format(prop) setter = 'set_{}'.format(prop) for chan in range(num_chans): + initial_value = getattr(usrp, getter)(chan) # Test value below, above, and within range # If a get_range function is passed in: try: prop_range = getattr(usrp, get_range)(chan) min_val = prop_range.start() - 10 max_val = prop_range.stop() + 10 + mid_point = prop_range.clip((min_val + max_val) / 2, True) # get_range isn't a function, its a list. except TypeError: min_val = get_range[0] - 10 max_val = get_range[1] + 10 - mid_point = (min_val + max_val) / 2 + mid_point = (get_range[0] + get_range[1]) / 2 # Values must be converted to TuneRequest type for these functions. arg_converter = arg_converter if arg_converter is not None \ else lambda x: x min_val = arg_converter(min_val) max_val = arg_converter(max_val) - mid_point = arg_converter(mid_point) - # If setter is expected to throw errors. + # If setter is expected to throw errors, we set some invalid values and + # verify we get an exception: if error_handling == 'throw': - # get a couple of values inside and outside range - # apply using setter - # read using getter - # compare with expected behavior try: getattr(usrp, setter)(min_val, chan) except RuntimeError: @@ -93,28 +96,28 @@ def chan_test(usrp, prop, num_chans, error_handling, get_range, pass else: raise Exception('error found in max test of ', prop) - getattr(usrp, setter)(mid_point, chan) - # If setter implements error coercion + # If setter implements error coercion, then we should be able to set + # invalid values without an exception occurring: elif error_handling == 'coerce': - getattr(usrp, setter)(min_val, chan) getattr(usrp, setter)(max_val, chan) # Set acceptable value. - getattr(usrp, setter)(mid_point, chan) + getattr(usrp, setter)(arg_converter(mid_point), chan) # Check if the actual value is within range of set value - get_value = getattr(usrp, getter)(chan) - get_value = float(get_value) - mid_point = (prop_range.start() + prop_range.stop()) / 2 + get_value = float(getattr(usrp, getter)(chan)) if not numpy.isclose(get_value, mid_point, 0.005): - raise Exception('error found in setting acceptable value in {}'. - format(prop)) + raise Exception( + f'Error found in setting acceptable value in {prop}.\n' + f'Expected {mid_point}, got {get_value}.') + # Put it back the way we found it + getattr(usrp, setter)(initial_value, chan) return True def lo_name_test(usrp, prop, num_chans, get_range): """ - Test methods that an lo_name string as an argument. + Test methods that have an lo_name string as an argument. usrp -- Device object to run tests on. prop -- String of function to be tested. num_chans -- Integer for number of channels. @@ -131,26 +134,31 @@ def lo_name_test(usrp, prop, num_chans, get_range): # For each lo_name, set a value below minimum, # above maximum, and within range. for lo_name in lo_names: + initial_value = getattr(usrp, getter)(lo_name, chan) prop_range = getattr(usrp, get_range)(lo_name, chan) - min_val = prop_range.start() - 10 - max_val = prop_range.stop() + 10 - mid_point = (min_val + max_val) / 2 + min_val = prop_range.start() + max_val = prop_range.stop() + mid_point = \ + prop_range.clip( + (prop_range.start() + prop_range.stop()) / 2, True) try: - getattr(usrp, setter)(min_val, chan) + getattr(usrp, setter)(min_val, lo_name, chan) except RuntimeError: raise Exception('error found in min test of ', prop) try: - getattr(usrp, setter)(max_val, chan) + getattr(usrp, setter)(max_val, lo_name, chan) except RuntimeError: raise Exception('error found in max test of ', prop) - getattr(usrp, setter)(mid_point, chan) + getattr(usrp, setter)(mid_point, lo_name, chan) # Check if the actual value is within range of set value - get_value = getattr(usrp, getter)(chan) - get_value = float(get_value) - mid_point = (prop_range.start() + prop_range.stop()) / 2 + get_value = float(getattr(usrp, getter)(lo_name, chan)) if not numpy.isclose(mid_point, get_value, 0.005): - raise Exception('error found in setting acceptable value in ', - prop) + raise Exception( + f'Error found in setting acceptable value in {prop} ' + f'for LO {lo_name} on channel {chan}.\n' + f'Expected value: {mid_point} Actual: {get_value}') + # Put it back the way we found it + getattr(usrp, setter)(initial_value, lo_name, chan) return True @@ -158,21 +166,24 @@ def range_test(usrp, prop, num_chans, error_handling=None, args_type='chan', arg_converter=None, get_range=None): """ Function to perform range_tests using getrange, getters, and setters. - usrp -- Device object to run tests on. - prop -- String of function to be tested. - num_chans -- Integer for number of channels. - error_handling -- coercer or throw, depending on expected results. - args_type -- The type of argument that must be passed into the function. - arg_converter -- String for type to convert values to. - get_range -- String for get_range function + usrp -- Device object to run tests on. + prop -- String of function to be tested. + num_chans -- Integer for number of channels. + error_handling -- coerce or throw, depending on expected results. + args_type -- The type of argument that must be passed into the function. + Possible values: 'chan' means the argument is a channel number. + 'lo_name' means it is an LO name. + arg_converter -- String for type to convert values to. + get_range -- String for get_range function """ assert error_handling in ('coerce', 'throw') + assert args_type in ('chan', 'lo_name') if get_range is None: get_range = 'get_{}_range'.format(prop) if args_type == 'chan': to_ret = chan_test(usrp, prop, num_chans, error_handling, get_range, arg_converter) - else: + else: # args_type == 'lo_name': to_ret = lo_name_test(usrp, prop, num_chans, get_range) return to_ret @@ -188,20 +199,21 @@ def discrete_options_test(usrp, prop, num_chans, error_handling -- coercer or throw, depending on expected results. """ assert error_handling in ('coerce', 'throw') - get_range = 'get_{}s'.format(prop) # Generate all possible set values - return chan_test(usrp, prop, num_chans, error_handling, - get_range) + return chan_test(usrp, prop, num_chans, error_handling, get_range) -def list_test(usrp, prop, error_handling, post_hook=None): +def list_test(usrp, prop, error_handling, post_hook=None, safe_values=None): """ Function to perform tests on methods that return lists of possible discrete values (strings). usrp -- Device object to run tests on. prop -- String of function to be tested. error_handling -- coercer or throw, depending on expected results. + post_hook -- Callback to call unconditionally after executing test (e.g. to + reset something) + safe_values -- A list of safe values that can be tested from the range. """ assert error_handling in ('coerce', 'throw') # The following functions have different get_range functions. @@ -217,9 +229,10 @@ def list_test(usrp, prop, error_handling, post_hook=None): else getattr(usrp, get_range)() # Try to set every possible value. for name in names: - # GPSDO may not be connected. - if name in ('gpsdo', 'internal'): + if safe_values and name not in safe_values: + print(f"Skipping value `{name}' for prop {prop}, considered unsafe.") continue + initial_value = getattr(usrp, getter)(0) try: getattr(usrp, setter)(name) except RuntimeError as ex: @@ -230,6 +243,7 @@ def list_test(usrp, prop, error_handling, post_hook=None): if get_value != name: raise Exception('Error in setting acceptable value in {}' .format(prop)) + getattr(usrp, setter)(initial_value) if post_hook: post_hook() return True @@ -355,8 +369,7 @@ def chan_range_test(usrp, prop, num_chans): prop -- String of function to be tested. num_chans -- Integer value of number of channels. """ - # The following functions do not require the mboard index. - if prop == 'set_rx_iq_balance' or prop == 'set_tx_iq_balance': + if prop in ('set_rx_iq_balance', 'set_tx_iq_balance'): for chan in range(0, num_chans): getattr(usrp, prop)(1, chan) else: @@ -366,22 +379,18 @@ def chan_range_test(usrp, prop, num_chans): return True -def gpio_attr_test(usrp, prop, num_mboards): +def gpio_attr_test(usrp, num_mboards): """ Perform tests for get_gpio_attr and set_gpio_attr. usrp -- Device object to run tests on. - prop -- String of function to be tested. num_mboards -- Integer value of number of motherboards. """ - if prop == 'get_gpio_attr': - for mboard in range(0, num_mboards): - bank = getattr(usrp, 'get_gpio_banks')(mboard) - getattr(usrp, prop)(bank[0], 'CTRL') - elif prop == 'set_gpio_attr': - if num_mboards > 0: - bank = getattr(usrp, 'get_gpio_banks')(0) - getattr(usrp, prop)(bank[0], 'CTRL', 0) + for mboard in range(0, num_mboards): + banks = usrp.get_gpio_banks(mboard) + for bank in banks: + value = usrp.get_gpio_attr(bank, 'CTRL') + usrp.set_gpio_attr(bank, 'CTRL', value) return True @@ -420,55 +429,139 @@ def filter_test(usrp, prop, num_chans): return True -def run_api_test(usrp): +def tree_test(usrp): + """ + Test prop tree access + """ + tree = usrp.get_tree() + name = tree.access_str("/name").get() + print("Property tree got name: " + name) + return True + + +def power_test(usrp, direction, num_chans): + """ + Test the power reference API. If we don't have a power cal API available, + then we check it fails. + """ + has_cb = getattr(usrp, f'has_{direction}_power_reference') + get_range_cb = getattr(usrp, f'get_{direction}_power_range') + get_cb = getattr(usrp, f'get_{direction}_power_reference') + set_cb = getattr(usrp, f'set_{direction}_power_reference') + for chan in range(num_chans): + # pylint: disable=bare-except + if not has_cb(chan): + try: + get_range_cb(chan) + except: + pass + else: + raise(f"get_{direction}_power_range({chan}): " + "Expected exception (no power cal), none occurred.") + try: + get_cb(chan) + except: + pass + else: + raise(f"get_{direction}_power_reference({chan}): " + "Expected exception (no power cal), none occurred.") + try: + set_cb(100, chan) + except: + pass + else: + raise(f"set_{direction}_power_reference({chan}): " + "Expected exception (no power cal), none occurred.") + continue + # pylint: enable=bare-except + # Now check power API for reals: + initial_value = getattr(usrp, f'get_{direction}_gain')(chan) + # Test value below, above, and within range + prop_range = get_range_cb(chan) + min_val = prop_range.start() - 10 + max_val = prop_range.stop() + 10 + mid_point = prop_range.clip((min_val + max_val) / 2, True) + # These should not throw + set_cb(min_val, chan) + set_cb(max_val, chan) + # Set acceptable value in the middle + set_cb(mid_point, chan) + # Check if the actual value is within range of set value + actual_value = get_cb(chan) + # We'll allow a pretty big variance, the power coercion has a lot of + # constraints + if not numpy.isclose(actual_value, mid_point, 10): + raise Exception( + f'Error found in setting midpoint power value for {direction}.\n' + f'Expected {mid_point}, got {actual_value}.') + # Put it back the way we found it + getattr(usrp, f'set_{direction}_gain')(initial_value, chan) + return True + +def run_api_test(usrp, device_config): """ Name functions to be tested. usrp -- device object to run tests on + device_config -- Dictionary that contains further configuration for various + tests. """ - num_rx_chans = usrp.get_rx_num_channels() - num_tx_chans = usrp.get_tx_num_channels() - + num_rx_chans = device_config.get('num_rx_channels', usrp.get_rx_num_channels()) + num_tx_chans = device_config.get('num_tx_channels', usrp.get_tx_num_channels()) num_mboards = usrp.get_num_mboards() method_executor = MethodExecutor() # Append functions already called or will be called implicitly. - method_executor.tested.append('get_rx_num_channels') - method_executor.tested.append('get_tx_num_channels') - method_executor.tested.append('get_usrp_rx_info') - method_executor.tested.append('get_usrp_tx_info') - method_executor.tested.append('get_num_mboards') + method_executor.tested.extend(( + 'make', + 'issue_stream_cmd', # Gets called by recv_num_samps + 'get_rx_num_channels', # Got called above + 'get_tx_num_channels', # Got called above + 'get_usrp_rx_info', + 'get_usrp_tx_info', + 'get_num_mboards', # Got called above + 'get_rx_stream', + 'get_tx_stream', # Require stream_args_t + )) + method_executor.tested.extend(device_config.get('imply_tested', [])) actual_tests = [ (['get_rx_freq', 'set_rx_freq', 'get_rx_freq_range'], lambda: range_test(usrp, 'rx_freq', num_rx_chans, 'coerce', - uhd.types.TuneRequest)), + arg_converter=uhd.types.TuneRequest)), (['get_rx_gain', 'set_rx_gain', 'get_rx_gain_range'], lambda: range_test(usrp, 'rx_gain', num_rx_chans, 'coerce')), (['get_rx_bandwidth', 'set_rx_bandwidth', 'get_rx_bandwidth_range'], lambda: range_test(usrp, 'rx_bandwidth', num_rx_chans, 'coerce')), (['get_rx_lo_freq', 'set_rx_lo_freq', 'get_rx_lo_freq_range'], lambda: range_test(usrp, 'rx_lo_freq', num_rx_chans, 'coerce', - 'lo_name')), + args_type='lo_name')), (['get_rx_rate', 'set_rx_rate', 'get_rx_rates'], lambda: discrete_options_test(usrp, 'rx_rate', num_rx_chans, 'coerce')), - (['get_time_source', 'set_time_source', 'get_time_source_names'], + (['get_time_source', 'set_time_source', 'get_time_sources'], lambda: list_test(usrp, 'time_source', 'coerce', - lambda: usrp.set_time_source('internal'))), - (['get_clock_source', 'set_clock_source', 'get_clock_names'], - lambda: list_test(usrp, 'clock_source', 'coerce')), - (['get_rx_antenna', 'set_rx_antenna', 'get_rx_antenna_names'], + safe_values=device_config.get('time_sources', ('internal')))), + (['get_clock_source', 'set_clock_source', 'get_clock_sources'], + lambda: list_test(usrp, 'clock_source', 'coerce', + safe_values=device_config.get('clock_sources', ('internal')))), + (['get_sync_source', 'set_sync_source', 'get_sync_sources'], + lambda: list_test( + usrp, 'clock_source', 'coerce', + safe_values=device_config.get('sync_sources', ({ + 'clock_source': 'internal', + 'time_source': 'internal', + })))), + (['get_rx_antenna', 'set_rx_antenna', 'get_rx_antennas'], lambda: list_test(usrp, 'rx_antenna', 'coerce')), - (['get_tx_antenna', 'set_tx_antenna', 'get_tx_antenna_names'], + (['get_tx_antenna', 'set_tx_antenna', 'get_tx_antennas'], lambda: list_test(usrp, 'tx_antenna', 'coerce')), (['get_rx_lo_source', 'set_rx_lo_source', 'get_rx_lo_source_names'], - lambda: list_test(usrp, 'rx_lo_source', 'coerce')), - (['get_normalized_rx_gain', 'set_normalized_rx_gain', - 'get_normalied_rx_gain_range'], + lambda: list_test(usrp, 'rx_lo_source', error_handling='throw')), + (['get_normalized_rx_gain', 'set_normalized_rx_gain'], lambda: range_test(usrp, 'normalized_rx_gain', num_rx_chans, - 'coerce', [0, 1])), + error_handling='throw', get_range=[0, 1])), (['get_rx_subdev_name'], lambda: get_test(usrp, 'rx_subdev_name', num_rx_chans)), (['get_rx_subdev_spec'], @@ -476,7 +569,7 @@ def run_api_test(usrp): (['get_tx_freq', 'set_tx_freq', 'get_tx_freq_range'], lambda: range_test(usrp, 'tx_freq', num_tx_chans, 'coerce', - uhd.types.TuneRequest)), + arg_converter=uhd.types.TuneRequest)), (['get_tx_gain', 'set_tx_gain', 'get_tx_gain_range'], lambda: range_test(usrp, 'tx_gain', num_tx_chans, 'coerce')), (['get_tx_bandwidth', 'set_tx_bandwidth', 'get_tx_bandwidth_range'], @@ -489,9 +582,9 @@ def run_api_test(usrp): 'coerce')), (['get_tx_lo_source', 'set_tx_lo_source', 'get_tx_lo_names'], lambda: list_test(usrp, 'tx_lo_source', 'coerce')), - (['get_normalized_tx_gain', 'set_normalized_tx_gain', 'get_normalized_tx_gain_range'], + (['get_normalized_tx_gain', 'set_normalized_tx_gain'], lambda: range_test(usrp, 'normalized_tx_gain', num_rx_chans, - 'coerce', [0, 1])), + error_handling='throw', get_range=[0, 1])), (['get_tx_subdev_name'], lambda: get_test(usrp, "tx_subdev_name", num_tx_chans)), (['get_tx_subdev_spec'], @@ -507,7 +600,7 @@ def run_api_test(usrp): (['get_rx_gain_profile', 'set_rx_gain_profile', 'get_rx_gain_profile_names'], lambda: list_test(usrp, 'rx_gain_profile', 'coerce')), (['get_master_clock_rate', 'set_master_clock_rate', 'get_master_clock_rate_range'], - lambda: range_test(usrp, 'master_clock_rate', 1, 'throw')), + lambda: range_test(usrp, 'master_clock_rate', 1, 'coerce')), (['get_mboard_sensor_names', 'get_mboard_sensor'], lambda: test_sensor_api(usrp, 'mboard', num_mboards)), (['get_tx_sensor_names', 'get_tx_sensor'], @@ -534,23 +627,16 @@ def run_api_test(usrp): lambda: chan_range_test(usrp, "set_tx_dc_offset", num_tx_chans)), (['set_rx_agc'], lambda: chan_range_test(usrp, "set_rx_agc", num_rx_chans)), - (['get_gpio_attr'], - lambda: gpio_attr_test(usrp, "get_gpio_attr", num_mboards)), - (['set_gpio_attr'], - lambda: gpio_attr_test(usrp, "set_gpio_attr", num_mboards)), + (['get_gpio_attr', 'set_gpio_attr', 'get_gpio_banks'], + lambda: gpio_attr_test(usrp, num_mboards)), (['get_fe_rx_freq_range'], usrp.get_fe_rx_freq_range), (['get_fe_tx_freq_range'], usrp.get_fe_tx_freq_range), - (['get_normalized_tx_gain'], usrp.get_normalized_tx_gain), (['get_pp_string'], usrp.get_pp_string), - (['get_rx_antennas'], usrp.get_rx_antennas), (['get_rx_gain_names'], usrp.get_rx_gain_names), (['get_rx_lo_export_enabled'], usrp.get_rx_lo_export_enabled), (['get_rx_lo_names'], usrp.get_rx_lo_names), (['get_rx_lo_sources'], usrp.get_rx_lo_sources), - (['get_time_sources'], - lambda: mboard_range_test(usrp, "get_time_sources", num_mboards)), (['get_time_synchronized'], usrp.get_time_synchronized), - (['get_tx_antennas'], usrp.get_tx_antennas), (['get_tx_gain_names'], usrp.get_tx_gain_names), (['get_tx_lo_export_enabled'], usrp.get_tx_lo_export_enabled), (['get_tx_lo_sources'], usrp.get_tx_lo_sources), @@ -558,27 +644,31 @@ def run_api_test(usrp): lambda: iq_balance_test(usrp, "set_rx_iq_balance", num_rx_chans)), (['set_tx_iq_balance'], lambda: iq_balance_test(usrp, "set_tx_iq_balance", num_tx_chans)), - (['get_clock_sources'], - lambda: mboard_range_test(usrp, "get_clock_sources", num_mboards)), (['get_rx_filter', 'set_rx_filter', 'get_rx_filter_names'], lambda: filter_test(usrp, "rx", num_rx_chans)), (['get_tx_filter', 'set_tx_filter', 'get_tx_filter_names'], lambda: filter_test(usrp, "tx", num_rx_chans)), (['clear_command_time'], usrp.clear_command_time), + (['get_tree'], lambda: tree_test(usrp)), + (['has_rx_power_reference', + 'set_rx_power_reference', + 'get_rx_power_reference', + 'get_rx_power_range', + ], lambda: power_test(usrp, 'rx', num_rx_chans), + ), + (['has_tx_power_reference', + 'set_tx_power_reference', + 'get_tx_power_reference', + 'get_tx_power_range', + ], lambda: power_test(usrp, 'rx', num_rx_chans), + ), ] - white_listed = ['__class__', '__delattr__', '__dict__', '__doc__', - '__format__', '__getattribute__', '__hash__', '__init__', - '__module__', '__new__', '__reduce__', '__reduce_ex__', - '__repr__', '__setattr__', '__sizeof__', '__str__', - '__subclasshook__', '__weakref__', - 'make', - 'issue_stream_cmd', - 'set_rx_lo_export_enabled', # Not supported w/all devices. + # List of tests that we don't test, but that's OK. + white_listed = ['set_rx_lo_export_enabled', # Not supported w/all devices. 'set_tx_lo_export_enabled', # Not supported w/all devices. 'set_time_source_out', # Not supported on all devices. 'get_register_info', # Requires path to register - 'get_rx_stream', 'get_tx_stream', # Require stream_args_t # Need register path, but enumerate_registers returns None 'read_register', 'write_register', 'set_command_time', # Time_spec_t required @@ -590,24 +680,31 @@ def run_api_test(usrp): 'set_time_unknown_pps', 'get_radio_control', 'get_mb_controller', + 'get_mpm_client', ] + blacklist = device_config.get('skip', []) success = True # ...then we can run them all through the executor like this: for method_names, test_callback in actual_tests: + if any(method in blacklist for method in method_names): + continue if not method_executor.execute_methods(method_names, test_callback): success = False - untested = [test for test in dir(usrp) if test not in - method_executor.tested and test not in white_listed] + untested = [ + test + (" [blacklisted]" if test in blacklist else "") for test in dir(usrp) + if test not in method_executor.tested \ + and test not in white_listed \ + and not test.startswith('_') + ] if method_executor.failed: print("The following API calls caused failures:") - print(method_executor.failed) - + print("* " + "\n* ".join(method_executor.failed)) if untested: print("The following functions were not tested:") - print(untested) + print("* " + "\n* ".join(untested)) return success @@ -618,22 +715,69 @@ def parse_args(): """ parser = argparse.ArgumentParser() parser.add_argument( - '--args', - default='None' + '--args', default='', + ) + parser.add_argument( + '--dump-defaults', + help="Specify a device type, and the default config will be dumped as YAML" + ) + parser.add_argument( + '--device-config', + help="Specify path to YAML file to use as device config" ) return parser.parse_args() +def get_device_config(usrp_type, device_config_path=None): + """ + Return a device configuration object. + """ + if device_config_path: + with open(device_config_path, 'r') as yaml_f: + return yaml.load(yaml_f) + if usrp_type in ('B205mini', 'B200mini', 'B200', 'B210'): + return { + 'skip': [ + 'set_rx_lo_export_enabled', + 'set_tx_lo_export_enabled', + 'get_rx_lo_source', + 'set_rx_lo_source', + 'get_rx_lo_source_names', + 'get_tx_lo_source', + 'set_tx_lo_source', + 'get_tx_lo_names', + 'get_master_clock_rate', + 'set_master_clock_rate', + 'get_master_clock_rate_range', + 'get_gpio_attr', + 'set_gpio_attr', + 'get_gpio_banks', + ], + } + return {} + +def dump_defaults(usrp_type): + """ + Print the hard-coded defaults as YAML + """ + defaults = get_device_config(usrp_type) + print(yaml.dump(defaults, default_flow_style=False)) + def main(): """ Returns True on Success """ args = parse_args() + if args.dump_defaults: + dump_defaults(args.dump_defaults) + return 0 usrp = uhd.usrp.MultiUSRP(args.args) - ret_val = run_api_test(usrp) + usrp_type = usrp.get_usrp_rx_info().get('mboard_id') + device_config = get_device_config(usrp_type, args.device_config) + ret_val = run_api_test(usrp, device_config) if ret_val != 1: raise Exception("Python API Tester Received Errors") return ret_val if __name__ == "__main__": - exit(not main()) + sys.exit(not main()) |