diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/tests/devtest/recv_stability_test.py | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/host/tests/devtest/recv_stability_test.py b/host/tests/devtest/recv_stability_test.py new file mode 100644 index 000000000..41707c7d3 --- /dev/null +++ b/host/tests/devtest/recv_stability_test.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +# +# Copyright 2021 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" Test recv, including some corner cases. """ + +import sys +import time +import argparse +import numpy as np +import uhd +try: + from ruamel import yaml +except: + import yaml + + +def run_recv_stability_test(usrp, device_config): + """ + Run tests related to recv(). + """ + num_chans = device_config.get('num_channels', usrp.get_rx_num_channels()) + str_args = uhd.usrp.StreamArgs( + device_config.get('cpu_type', 'fc32'), + device_config.get('cpu_type', 'sc16'), + ) + str_args.channels = list(range(num_chans)) + rx_streamer = usrp.get_rx_stream(str_args) + usrp.set_rx_rate(device_config.get('rate', 1e6)) + # Run tests + run_choke_test(usrp, rx_streamer, device_config) + return True + +def run_choke_test(usrp, rx_streamer, device_config): + """ + This will kick off a continuous stream, then interrupt it, then resume it. + We verify that: + - We get an overrun message (metadata) + - The stream resumes. + """ + + bufsize = 100 * rx_streamer.get_max_num_samps() + recv_buf = np.zeros( + (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64) + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) + stream_cmd.stream_now = False + init_delay = device_config.get('init_delay', 1.0) + stream_cmd.time_spec = usrp.get_time_now() + init_delay + rx_streamer.issue_stream_cmd(stream_cmd) + metadata = uhd.types.RXMetadata() + long_timeout = init_delay + 1.0 + num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout) + test_pass = True + # Large timeout, small rate: This should not fail + if num_samps_recvd != bufsize: + test_pass = False + print(f"run_choke_test(): First buffer recv() failed, rx'd " + f"{num_samps_recvd}/{bufsize} samples!") + if metadata.error_code != uhd.types.RXMetadataErrorCode.none: + test_pass = False + print( + f"run_choke_test(): First buffer recv() failed, rx'd " + f"error code: {metadata.strerror()}") + print("Choking RX...") + time.sleep(1) + print("Now, collecting the overrun (you should see an 'O' now).") + # On one of the next recv(), we should get an overrun. It may take a bit, + # because there will already be data in the pipe and depending on the USRP, + # overruns will be inline. + max_num_samps_before_o = device_config.get('max_num_samps_before_o', 1000000) + num_samps_recvd = 0 + overrun_recvd = False + while num_samps_recvd < max_num_samps_before_o: + num_samps_recvd += rx_streamer.recv(recv_buf, metadata, timeout=.1) + if metadata.error_code == uhd.types.RXMetadataErrorCode.overflow: + overrun_recvd = True + break + if metadata.error_code != uhd.types.RXMetadataErrorCode.none: + test_pass = False + print( + f"run_choke_test(): Second buffer recv() failed, rx'd " + f"error code: {metadata.strerror()} (should have been overflow or none)") + if not overrun_recvd: + test_pass = False + print( + f"run_choke_test(): Second buffer recv() failed, never rx'd " + f"an overflow") + # It should recover now: + num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout) + if num_samps_recvd != bufsize: + test_pass = False + print( + f"run_choke_test(): Third buffer recv() failed, rx'd " + f"{num_samps_recvd}/{bufsize} samples!") + if metadata.error_code != uhd.types.RXMetadataErrorCode.none: + test_pass = False + print( + f"run_choke_test(): Third buffer recv() failed, rx'd " + f"error code: {metadata.strerror()}") + stop_and_flush(rx_streamer) + if not test_pass: + raise RuntimeError("run_choke_test(): Test failed.") + +def stop_and_flush(rx_streamer): + """ + Utility to stop a streamer and clear the FIFO. + """ + print("Flushing FIFOs...") + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) + bufsize = rx_streamer.get_max_num_samps() + recv_buf = np.zeros( + (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64) + rx_streamer.issue_stream_cmd(stream_cmd) + metadata = uhd.types.RXMetadata() + stop_time = time.monotonic() + 5.0 + while time.monotonic() < stop_time: + rx_streamer.recv(recv_buf, metadata, timeout=0) + if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: + break + +def parse_args(): + """ + Parse args. + """ + parser = argparse.ArgumentParser() + parser.add_argument( + '--args', default='', + ) + parser.add_argument( + '--dump-defaults', + help="Specify a device type, and the default config will be dumped as YAML" + ) + parser.add_argument( + '--device-config', + help="Specify path to YAML file to use as device config" + ) + return parser.parse_args() + +def get_device_config(usrp_type, device_config_path=None): + """ + Return a device configuration object. + """ + if device_config_path: + with open(device_config_path, 'r') as yaml_f: + return yaml.load(yaml_f) + return {} + +def dump_defaults(usrp_type): + """ + Print the hard-coded defaults as YAML + """ + defaults = get_device_config(usrp_type) + print(yaml.dump(defaults, default_flow_style=False)) + +def main(): + """ + Returns True on Success + """ + args = parse_args() + if args.dump_defaults: + dump_defaults(args.dump_defaults) + return 0 + usrp = uhd.usrp.MultiUSRP(args.args) + usrp_type = usrp.get_usrp_rx_info().get('mboard_id') + device_config = get_device_config(usrp_type, args.device_config) + ret_val = run_recv_stability_test(usrp, device_config) + if ret_val != 1: + raise Exception("Python API Tester Received Errors") + return ret_val + +if __name__ == "__main__": + sys.exit(not main()) |