aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2021-05-07 09:47:15 +0200
committerAaron Rossetto <aaron.rossetto@ni.com>2021-05-10 15:02:24 -0500
commit147527a75d71098a56945dc3fe751016f79f4a86 (patch)
tree9efe0d8c62cf6472488710f89907977a1e59efc7 /host
parent8a33db6022e56b3a94033695fe5c2da3b476a7e2 (diff)
downloaduhd-147527a75d71098a56945dc3fe751016f79f4a86.tar.gz
uhd-147527a75d71098a56945dc3fe751016f79f4a86.tar.bz2
uhd-147527a75d71098a56945dc3fe751016f79f4a86.zip
tests: Update multi_usrp_test.py
Many updates to this test. Most tests weren't even working properly. Highlights: - Add device-specific configuration. Includes defaults, but also the option to specify a YAML file. - Improved output for better readability - Made a whole bunch of tests work
Diffstat (limited to 'host')
-rwxr-xr-xhost/tests/devtest/multi_usrp_test.py374
1 files changed, 259 insertions, 115 deletions
diff --git a/host/tests/devtest/multi_usrp_test.py b/host/tests/devtest/multi_usrp_test.py
index ba17337a6..b9add8381 100755
--- a/host/tests/devtest/multi_usrp_test.py
+++ b/host/tests/devtest/multi_usrp_test.py
@@ -5,11 +5,17 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
""" Test all python API functions for the connected device. """
+
import sys
import argparse
import numpy
import uhd
+try:
+ from ruamel import yaml
+except:
+ import yaml
+# pylint: disable=broad-except
class MethodExecutor(object):
"""
@@ -31,20 +37,19 @@ class MethodExecutor(object):
method_callback -- String containing the method to call.
"""
self.tested.extend(method_names)
- print("Executing {}".format(method_names))
+ method_names_str = ", ".join([method + "()" for method in method_names])
+ print(f"Executing methods: {method_names_str}")
try:
- method_callback() is True
+ method_callback()
except Exception as ex:
- print("Error while executing `{}`: {}".format(
- method_names, str(ex)
- ), file=sys.stderr)
+ print(f"Error while executing methods: \n`{method_names_str}`:\n{ex}",
+ file=sys.stderr)
self.failed.extend(method_names)
return False
return True
-def chan_test(usrp, prop, num_chans, error_handling, get_range,
- arg_converter=None):
+def chan_test(usrp, prop, num_chans, error_handling, get_range, arg_converter=None):
"""
Test methods that take channel number as input.
usrp -- Device object to run tests on.
@@ -57,30 +62,28 @@ def chan_test(usrp, prop, num_chans, error_handling, get_range,
getter = 'get_{}'.format(prop)
setter = 'set_{}'.format(prop)
for chan in range(num_chans):
+ initial_value = getattr(usrp, getter)(chan)
# Test value below, above, and within range
# If a get_range function is passed in:
try:
prop_range = getattr(usrp, get_range)(chan)
min_val = prop_range.start() - 10
max_val = prop_range.stop() + 10
+ mid_point = prop_range.clip((min_val + max_val) / 2, True)
# get_range isn't a function, its a list.
except TypeError:
min_val = get_range[0] - 10
max_val = get_range[1] + 10
- mid_point = (min_val + max_val) / 2
+ mid_point = (get_range[0] + get_range[1]) / 2
# Values must be converted to TuneRequest type for these functions.
arg_converter = arg_converter if arg_converter is not None \
else lambda x: x
min_val = arg_converter(min_val)
max_val = arg_converter(max_val)
- mid_point = arg_converter(mid_point)
- # If setter is expected to throw errors.
+ # If setter is expected to throw errors, we set some invalid values and
+ # verify we get an exception:
if error_handling == 'throw':
- # get a couple of values inside and outside range
- # apply using setter
- # read using getter
- # compare with expected behavior
try:
getattr(usrp, setter)(min_val, chan)
except RuntimeError:
@@ -93,28 +96,28 @@ def chan_test(usrp, prop, num_chans, error_handling, get_range,
pass
else:
raise Exception('error found in max test of ', prop)
- getattr(usrp, setter)(mid_point, chan)
- # If setter implements error coercion
+ # If setter implements error coercion, then we should be able to set
+ # invalid values without an exception occurring:
elif error_handling == 'coerce':
-
getattr(usrp, setter)(min_val, chan)
getattr(usrp, setter)(max_val, chan)
# Set acceptable value.
- getattr(usrp, setter)(mid_point, chan)
+ getattr(usrp, setter)(arg_converter(mid_point), chan)
# Check if the actual value is within range of set value
- get_value = getattr(usrp, getter)(chan)
- get_value = float(get_value)
- mid_point = (prop_range.start() + prop_range.stop()) / 2
+ get_value = float(getattr(usrp, getter)(chan))
if not numpy.isclose(get_value, mid_point, 0.005):
- raise Exception('error found in setting acceptable value in {}'.
- format(prop))
+ raise Exception(
+ f'Error found in setting acceptable value in {prop}.\n'
+ f'Expected {mid_point}, got {get_value}.')
+ # Put it back the way we found it
+ getattr(usrp, setter)(initial_value, chan)
return True
def lo_name_test(usrp, prop, num_chans, get_range):
"""
- Test methods that an lo_name string as an argument.
+ Test methods that have an lo_name string as an argument.
usrp -- Device object to run tests on.
prop -- String of function to be tested.
num_chans -- Integer for number of channels.
@@ -131,26 +134,31 @@ def lo_name_test(usrp, prop, num_chans, get_range):
# For each lo_name, set a value below minimum,
# above maximum, and within range.
for lo_name in lo_names:
+ initial_value = getattr(usrp, getter)(lo_name, chan)
prop_range = getattr(usrp, get_range)(lo_name, chan)
- min_val = prop_range.start() - 10
- max_val = prop_range.stop() + 10
- mid_point = (min_val + max_val) / 2
+ min_val = prop_range.start()
+ max_val = prop_range.stop()
+ mid_point = \
+ prop_range.clip(
+ (prop_range.start() + prop_range.stop()) / 2, True)
try:
- getattr(usrp, setter)(min_val, chan)
+ getattr(usrp, setter)(min_val, lo_name, chan)
except RuntimeError:
raise Exception('error found in min test of ', prop)
try:
- getattr(usrp, setter)(max_val, chan)
+ getattr(usrp, setter)(max_val, lo_name, chan)
except RuntimeError:
raise Exception('error found in max test of ', prop)
- getattr(usrp, setter)(mid_point, chan)
+ getattr(usrp, setter)(mid_point, lo_name, chan)
# Check if the actual value is within range of set value
- get_value = getattr(usrp, getter)(chan)
- get_value = float(get_value)
- mid_point = (prop_range.start() + prop_range.stop()) / 2
+ get_value = float(getattr(usrp, getter)(lo_name, chan))
if not numpy.isclose(mid_point, get_value, 0.005):
- raise Exception('error found in setting acceptable value in ',
- prop)
+ raise Exception(
+ f'Error found in setting acceptable value in {prop} '
+ f'for LO {lo_name} on channel {chan}.\n'
+ f'Expected value: {mid_point} Actual: {get_value}')
+ # Put it back the way we found it
+ getattr(usrp, setter)(initial_value, lo_name, chan)
return True
@@ -158,21 +166,24 @@ def range_test(usrp, prop, num_chans, error_handling=None,
args_type='chan', arg_converter=None, get_range=None):
"""
Function to perform range_tests using getrange, getters, and setters.
- usrp -- Device object to run tests on.
- prop -- String of function to be tested.
- num_chans -- Integer for number of channels.
- error_handling -- coercer or throw, depending on expected results.
- args_type -- The type of argument that must be passed into the function.
- arg_converter -- String for type to convert values to.
- get_range -- String for get_range function
+ usrp -- Device object to run tests on.
+ prop -- String of function to be tested.
+ num_chans -- Integer for number of channels.
+ error_handling -- coerce or throw, depending on expected results.
+ args_type -- The type of argument that must be passed into the function.
+ Possible values: 'chan' means the argument is a channel number.
+ 'lo_name' means it is an LO name.
+ arg_converter -- String for type to convert values to.
+ get_range -- String for get_range function
"""
assert error_handling in ('coerce', 'throw')
+ assert args_type in ('chan', 'lo_name')
if get_range is None:
get_range = 'get_{}_range'.format(prop)
if args_type == 'chan':
to_ret = chan_test(usrp, prop, num_chans,
error_handling, get_range, arg_converter)
- else:
+ else: # args_type == 'lo_name':
to_ret = lo_name_test(usrp, prop, num_chans, get_range)
return to_ret
@@ -188,20 +199,21 @@ def discrete_options_test(usrp, prop, num_chans,
error_handling -- coercer or throw, depending on expected results.
"""
assert error_handling in ('coerce', 'throw')
-
get_range = 'get_{}s'.format(prop)
# Generate all possible set values
- return chan_test(usrp, prop, num_chans, error_handling,
- get_range)
+ return chan_test(usrp, prop, num_chans, error_handling, get_range)
-def list_test(usrp, prop, error_handling, post_hook=None):
+def list_test(usrp, prop, error_handling, post_hook=None, safe_values=None):
"""
Function to perform tests on methods that return lists of possible
discrete values (strings).
usrp -- Device object to run tests on.
prop -- String of function to be tested.
error_handling -- coercer or throw, depending on expected results.
+ post_hook -- Callback to call unconditionally after executing test (e.g. to
+ reset something)
+ safe_values -- A list of safe values that can be tested from the range.
"""
assert error_handling in ('coerce', 'throw')
# The following functions have different get_range functions.
@@ -217,9 +229,10 @@ def list_test(usrp, prop, error_handling, post_hook=None):
else getattr(usrp, get_range)()
# Try to set every possible value.
for name in names:
- # GPSDO may not be connected.
- if name in ('gpsdo', 'internal'):
+ if safe_values and name not in safe_values:
+ print(f"Skipping value `{name}' for prop {prop}, considered unsafe.")
continue
+ initial_value = getattr(usrp, getter)(0)
try:
getattr(usrp, setter)(name)
except RuntimeError as ex:
@@ -230,6 +243,7 @@ def list_test(usrp, prop, error_handling, post_hook=None):
if get_value != name:
raise Exception('Error in setting acceptable value in {}'
.format(prop))
+ getattr(usrp, setter)(initial_value)
if post_hook:
post_hook()
return True
@@ -355,8 +369,7 @@ def chan_range_test(usrp, prop, num_chans):
prop -- String of function to be tested.
num_chans -- Integer value of number of channels.
"""
- # The following functions do not require the mboard index.
- if prop == 'set_rx_iq_balance' or prop == 'set_tx_iq_balance':
+ if prop in ('set_rx_iq_balance', 'set_tx_iq_balance'):
for chan in range(0, num_chans):
getattr(usrp, prop)(1, chan)
else:
@@ -366,22 +379,18 @@ def chan_range_test(usrp, prop, num_chans):
return True
-def gpio_attr_test(usrp, prop, num_mboards):
+def gpio_attr_test(usrp, num_mboards):
"""
Perform tests for get_gpio_attr and set_gpio_attr.
usrp -- Device object to run tests on.
- prop -- String of function to be tested.
num_mboards -- Integer value of number of motherboards.
"""
- if prop == 'get_gpio_attr':
- for mboard in range(0, num_mboards):
- bank = getattr(usrp, 'get_gpio_banks')(mboard)
- getattr(usrp, prop)(bank[0], 'CTRL')
- elif prop == 'set_gpio_attr':
- if num_mboards > 0:
- bank = getattr(usrp, 'get_gpio_banks')(0)
- getattr(usrp, prop)(bank[0], 'CTRL', 0)
+ for mboard in range(0, num_mboards):
+ banks = usrp.get_gpio_banks(mboard)
+ for bank in banks:
+ value = usrp.get_gpio_attr(bank, 'CTRL')
+ usrp.set_gpio_attr(bank, 'CTRL', value)
return True
@@ -420,55 +429,139 @@ def filter_test(usrp, prop, num_chans):
return True
-def run_api_test(usrp):
+def tree_test(usrp):
+ """
+ Test prop tree access
+ """
+ tree = usrp.get_tree()
+ name = tree.access_str("/name").get()
+ print("Property tree got name: " + name)
+ return True
+
+
+def power_test(usrp, direction, num_chans):
+ """
+ Test the power reference API. If we don't have a power cal API available,
+ then we check it fails.
+ """
+ has_cb = getattr(usrp, f'has_{direction}_power_reference')
+ get_range_cb = getattr(usrp, f'get_{direction}_power_range')
+ get_cb = getattr(usrp, f'get_{direction}_power_reference')
+ set_cb = getattr(usrp, f'set_{direction}_power_reference')
+ for chan in range(num_chans):
+ # pylint: disable=bare-except
+ if not has_cb(chan):
+ try:
+ get_range_cb(chan)
+ except:
+ pass
+ else:
+ raise(f"get_{direction}_power_range({chan}): "
+ "Expected exception (no power cal), none occurred.")
+ try:
+ get_cb(chan)
+ except:
+ pass
+ else:
+ raise(f"get_{direction}_power_reference({chan}): "
+ "Expected exception (no power cal), none occurred.")
+ try:
+ set_cb(100, chan)
+ except:
+ pass
+ else:
+ raise(f"set_{direction}_power_reference({chan}): "
+ "Expected exception (no power cal), none occurred.")
+ continue
+ # pylint: enable=bare-except
+ # Now check power API for reals:
+ initial_value = getattr(usrp, f'get_{direction}_gain')(chan)
+ # Test value below, above, and within range
+ prop_range = get_range_cb(chan)
+ min_val = prop_range.start() - 10
+ max_val = prop_range.stop() + 10
+ mid_point = prop_range.clip((min_val + max_val) / 2, True)
+ # These should not throw
+ set_cb(min_val, chan)
+ set_cb(max_val, chan)
+ # Set acceptable value in the middle
+ set_cb(mid_point, chan)
+ # Check if the actual value is within range of set value
+ actual_value = get_cb(chan)
+ # We'll allow a pretty big variance, the power coercion has a lot of
+ # constraints
+ if not numpy.isclose(actual_value, mid_point, 10):
+ raise Exception(
+ f'Error found in setting midpoint power value for {direction}.\n'
+ f'Expected {mid_point}, got {actual_value}.')
+ # Put it back the way we found it
+ getattr(usrp, f'set_{direction}_gain')(initial_value, chan)
+ return True
+
+def run_api_test(usrp, device_config):
"""
Name functions to be tested.
usrp -- device object to run tests on
+ device_config -- Dictionary that contains further configuration for various
+ tests.
"""
- num_rx_chans = usrp.get_rx_num_channels()
- num_tx_chans = usrp.get_tx_num_channels()
-
+ num_rx_chans = device_config.get('num_rx_channels', usrp.get_rx_num_channels())
+ num_tx_chans = device_config.get('num_tx_channels', usrp.get_tx_num_channels())
num_mboards = usrp.get_num_mboards()
method_executor = MethodExecutor()
# Append functions already called or will be called implicitly.
- method_executor.tested.append('get_rx_num_channels')
- method_executor.tested.append('get_tx_num_channels')
- method_executor.tested.append('get_usrp_rx_info')
- method_executor.tested.append('get_usrp_tx_info')
- method_executor.tested.append('get_num_mboards')
+ method_executor.tested.extend((
+ 'make',
+ 'issue_stream_cmd', # Gets called by recv_num_samps
+ 'get_rx_num_channels', # Got called above
+ 'get_tx_num_channels', # Got called above
+ 'get_usrp_rx_info',
+ 'get_usrp_tx_info',
+ 'get_num_mboards', # Got called above
+ 'get_rx_stream',
+ 'get_tx_stream', # Require stream_args_t
+ ))
+ method_executor.tested.extend(device_config.get('imply_tested', []))
actual_tests = [
(['get_rx_freq', 'set_rx_freq', 'get_rx_freq_range'],
lambda: range_test(usrp, 'rx_freq', num_rx_chans, 'coerce',
- uhd.types.TuneRequest)),
+ arg_converter=uhd.types.TuneRequest)),
(['get_rx_gain', 'set_rx_gain', 'get_rx_gain_range'],
lambda: range_test(usrp, 'rx_gain', num_rx_chans, 'coerce')),
(['get_rx_bandwidth', 'set_rx_bandwidth', 'get_rx_bandwidth_range'],
lambda: range_test(usrp, 'rx_bandwidth', num_rx_chans, 'coerce')),
(['get_rx_lo_freq', 'set_rx_lo_freq', 'get_rx_lo_freq_range'],
lambda: range_test(usrp, 'rx_lo_freq', num_rx_chans, 'coerce',
- 'lo_name')),
+ args_type='lo_name')),
(['get_rx_rate', 'set_rx_rate', 'get_rx_rates'],
lambda: discrete_options_test(usrp, 'rx_rate', num_rx_chans,
'coerce')),
- (['get_time_source', 'set_time_source', 'get_time_source_names'],
+ (['get_time_source', 'set_time_source', 'get_time_sources'],
lambda: list_test(usrp, 'time_source', 'coerce',
- lambda: usrp.set_time_source('internal'))),
- (['get_clock_source', 'set_clock_source', 'get_clock_names'],
- lambda: list_test(usrp, 'clock_source', 'coerce')),
- (['get_rx_antenna', 'set_rx_antenna', 'get_rx_antenna_names'],
+ safe_values=device_config.get('time_sources', ('internal')))),
+ (['get_clock_source', 'set_clock_source', 'get_clock_sources'],
+ lambda: list_test(usrp, 'clock_source', 'coerce',
+ safe_values=device_config.get('clock_sources', ('internal')))),
+ (['get_sync_source', 'set_sync_source', 'get_sync_sources'],
+ lambda: list_test(
+ usrp, 'clock_source', 'coerce',
+ safe_values=device_config.get('sync_sources', ({
+ 'clock_source': 'internal',
+ 'time_source': 'internal',
+ })))),
+ (['get_rx_antenna', 'set_rx_antenna', 'get_rx_antennas'],
lambda: list_test(usrp, 'rx_antenna', 'coerce')),
- (['get_tx_antenna', 'set_tx_antenna', 'get_tx_antenna_names'],
+ (['get_tx_antenna', 'set_tx_antenna', 'get_tx_antennas'],
lambda: list_test(usrp, 'tx_antenna', 'coerce')),
(['get_rx_lo_source', 'set_rx_lo_source', 'get_rx_lo_source_names'],
- lambda: list_test(usrp, 'rx_lo_source', 'coerce')),
- (['get_normalized_rx_gain', 'set_normalized_rx_gain',
- 'get_normalied_rx_gain_range'],
+ lambda: list_test(usrp, 'rx_lo_source', error_handling='throw')),
+ (['get_normalized_rx_gain', 'set_normalized_rx_gain'],
lambda: range_test(usrp, 'normalized_rx_gain', num_rx_chans,
- 'coerce', [0, 1])),
+ error_handling='throw', get_range=[0, 1])),
(['get_rx_subdev_name'],
lambda: get_test(usrp, 'rx_subdev_name', num_rx_chans)),
(['get_rx_subdev_spec'],
@@ -476,7 +569,7 @@ def run_api_test(usrp):
(['get_tx_freq', 'set_tx_freq', 'get_tx_freq_range'],
lambda: range_test(usrp, 'tx_freq', num_tx_chans, 'coerce',
- uhd.types.TuneRequest)),
+ arg_converter=uhd.types.TuneRequest)),
(['get_tx_gain', 'set_tx_gain', 'get_tx_gain_range'],
lambda: range_test(usrp, 'tx_gain', num_tx_chans, 'coerce')),
(['get_tx_bandwidth', 'set_tx_bandwidth', 'get_tx_bandwidth_range'],
@@ -489,9 +582,9 @@ def run_api_test(usrp):
'coerce')),
(['get_tx_lo_source', 'set_tx_lo_source', 'get_tx_lo_names'],
lambda: list_test(usrp, 'tx_lo_source', 'coerce')),
- (['get_normalized_tx_gain', 'set_normalized_tx_gain', 'get_normalized_tx_gain_range'],
+ (['get_normalized_tx_gain', 'set_normalized_tx_gain'],
lambda: range_test(usrp, 'normalized_tx_gain', num_rx_chans,
- 'coerce', [0, 1])),
+ error_handling='throw', get_range=[0, 1])),
(['get_tx_subdev_name'],
lambda: get_test(usrp, "tx_subdev_name", num_tx_chans)),
(['get_tx_subdev_spec'],
@@ -507,7 +600,7 @@ def run_api_test(usrp):
(['get_rx_gain_profile', 'set_rx_gain_profile', 'get_rx_gain_profile_names'],
lambda: list_test(usrp, 'rx_gain_profile', 'coerce')),
(['get_master_clock_rate', 'set_master_clock_rate', 'get_master_clock_rate_range'],
- lambda: range_test(usrp, 'master_clock_rate', 1, 'throw')),
+ lambda: range_test(usrp, 'master_clock_rate', 1, 'coerce')),
(['get_mboard_sensor_names', 'get_mboard_sensor'],
lambda: test_sensor_api(usrp, 'mboard', num_mboards)),
(['get_tx_sensor_names', 'get_tx_sensor'],
@@ -534,23 +627,16 @@ def run_api_test(usrp):
lambda: chan_range_test(usrp, "set_tx_dc_offset", num_tx_chans)),
(['set_rx_agc'],
lambda: chan_range_test(usrp, "set_rx_agc", num_rx_chans)),
- (['get_gpio_attr'],
- lambda: gpio_attr_test(usrp, "get_gpio_attr", num_mboards)),
- (['set_gpio_attr'],
- lambda: gpio_attr_test(usrp, "set_gpio_attr", num_mboards)),
+ (['get_gpio_attr', 'set_gpio_attr', 'get_gpio_banks'],
+ lambda: gpio_attr_test(usrp, num_mboards)),
(['get_fe_rx_freq_range'], usrp.get_fe_rx_freq_range),
(['get_fe_tx_freq_range'], usrp.get_fe_tx_freq_range),
- (['get_normalized_tx_gain'], usrp.get_normalized_tx_gain),
(['get_pp_string'], usrp.get_pp_string),
- (['get_rx_antennas'], usrp.get_rx_antennas),
(['get_rx_gain_names'], usrp.get_rx_gain_names),
(['get_rx_lo_export_enabled'], usrp.get_rx_lo_export_enabled),
(['get_rx_lo_names'], usrp.get_rx_lo_names),
(['get_rx_lo_sources'], usrp.get_rx_lo_sources),
- (['get_time_sources'],
- lambda: mboard_range_test(usrp, "get_time_sources", num_mboards)),
(['get_time_synchronized'], usrp.get_time_synchronized),
- (['get_tx_antennas'], usrp.get_tx_antennas),
(['get_tx_gain_names'], usrp.get_tx_gain_names),
(['get_tx_lo_export_enabled'], usrp.get_tx_lo_export_enabled),
(['get_tx_lo_sources'], usrp.get_tx_lo_sources),
@@ -558,27 +644,31 @@ def run_api_test(usrp):
lambda: iq_balance_test(usrp, "set_rx_iq_balance", num_rx_chans)),
(['set_tx_iq_balance'],
lambda: iq_balance_test(usrp, "set_tx_iq_balance", num_tx_chans)),
- (['get_clock_sources'],
- lambda: mboard_range_test(usrp, "get_clock_sources", num_mboards)),
(['get_rx_filter', 'set_rx_filter', 'get_rx_filter_names'],
lambda: filter_test(usrp, "rx", num_rx_chans)),
(['get_tx_filter', 'set_tx_filter', 'get_tx_filter_names'],
lambda: filter_test(usrp, "tx", num_rx_chans)),
(['clear_command_time'], usrp.clear_command_time),
+ (['get_tree'], lambda: tree_test(usrp)),
+ (['has_rx_power_reference',
+ 'set_rx_power_reference',
+ 'get_rx_power_reference',
+ 'get_rx_power_range',
+ ], lambda: power_test(usrp, 'rx', num_rx_chans),
+ ),
+ (['has_tx_power_reference',
+ 'set_tx_power_reference',
+ 'get_tx_power_reference',
+ 'get_tx_power_range',
+ ], lambda: power_test(usrp, 'rx', num_rx_chans),
+ ),
]
- white_listed = ['__class__', '__delattr__', '__dict__', '__doc__',
- '__format__', '__getattribute__', '__hash__', '__init__',
- '__module__', '__new__', '__reduce__', '__reduce_ex__',
- '__repr__', '__setattr__', '__sizeof__', '__str__',
- '__subclasshook__', '__weakref__',
- 'make',
- 'issue_stream_cmd',
- 'set_rx_lo_export_enabled', # Not supported w/all devices.
+ # List of tests that we don't test, but that's OK.
+ white_listed = ['set_rx_lo_export_enabled', # Not supported w/all devices.
'set_tx_lo_export_enabled', # Not supported w/all devices.
'set_time_source_out', # Not supported on all devices.
'get_register_info', # Requires path to register
- 'get_rx_stream', 'get_tx_stream', # Require stream_args_t
# Need register path, but enumerate_registers returns None
'read_register', 'write_register',
'set_command_time', # Time_spec_t required
@@ -590,24 +680,31 @@ def run_api_test(usrp):
'set_time_unknown_pps',
'get_radio_control',
'get_mb_controller',
+ 'get_mpm_client',
]
+ blacklist = device_config.get('skip', [])
success = True
# ...then we can run them all through the executor like this:
for method_names, test_callback in actual_tests:
+ if any(method in blacklist for method in method_names):
+ continue
if not method_executor.execute_methods(method_names, test_callback):
success = False
- untested = [test for test in dir(usrp) if test not in
- method_executor.tested and test not in white_listed]
+ untested = [
+ test + (" [blacklisted]" if test in blacklist else "") for test in dir(usrp)
+ if test not in method_executor.tested \
+ and test not in white_listed \
+ and not test.startswith('_')
+ ]
if method_executor.failed:
print("The following API calls caused failures:")
- print(method_executor.failed)
-
+ print("* " + "\n* ".join(method_executor.failed))
if untested:
print("The following functions were not tested:")
- print(untested)
+ print("* " + "\n* ".join(untested))
return success
@@ -618,22 +715,69 @@ def parse_args():
"""
parser = argparse.ArgumentParser()
parser.add_argument(
- '--args',
- default='None'
+ '--args', default='',
+ )
+ parser.add_argument(
+ '--dump-defaults',
+ help="Specify a device type, and the default config will be dumped as YAML"
+ )
+ parser.add_argument(
+ '--device-config',
+ help="Specify path to YAML file to use as device config"
)
return parser.parse_args()
+def get_device_config(usrp_type, device_config_path=None):
+ """
+ Return a device configuration object.
+ """
+ if device_config_path:
+ with open(device_config_path, 'r') as yaml_f:
+ return yaml.load(yaml_f)
+ if usrp_type in ('B205mini', 'B200mini', 'B200', 'B210'):
+ return {
+ 'skip': [
+ 'set_rx_lo_export_enabled',
+ 'set_tx_lo_export_enabled',
+ 'get_rx_lo_source',
+ 'set_rx_lo_source',
+ 'get_rx_lo_source_names',
+ 'get_tx_lo_source',
+ 'set_tx_lo_source',
+ 'get_tx_lo_names',
+ 'get_master_clock_rate',
+ 'set_master_clock_rate',
+ 'get_master_clock_rate_range',
+ 'get_gpio_attr',
+ 'set_gpio_attr',
+ 'get_gpio_banks',
+ ],
+ }
+ return {}
+
+def dump_defaults(usrp_type):
+ """
+ Print the hard-coded defaults as YAML
+ """
+ defaults = get_device_config(usrp_type)
+ print(yaml.dump(defaults, default_flow_style=False))
+
def main():
"""
Returns True on Success
"""
args = parse_args()
+ if args.dump_defaults:
+ dump_defaults(args.dump_defaults)
+ return 0
usrp = uhd.usrp.MultiUSRP(args.args)
- ret_val = run_api_test(usrp)
+ usrp_type = usrp.get_usrp_rx_info().get('mboard_id')
+ device_config = get_device_config(usrp_type, args.device_config)
+ ret_val = run_api_test(usrp, device_config)
if ret_val != 1:
raise Exception("Python API Tester Received Errors")
return ret_val
if __name__ == "__main__":
- exit(not main())
+ sys.exit(not main())