From d0e75f66fabacd2bc97981df6b0525b196a86fc5 Mon Sep 17 00:00:00 2001 From: Martin Braun Date: Tue, 21 Sep 2021 22:15:02 +0200 Subject: devtest: Add receive stability test At this point, this test chokes an RX streamer to force an overrun. It then confirms that the overrun message is returned to the call site, and that the streamer returns to continuous streaming after the overrun handling. --- host/tests/devtest/recv_stability_test.py | 174 ++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 host/tests/devtest/recv_stability_test.py (limited to 'host/tests/devtest') diff --git a/host/tests/devtest/recv_stability_test.py b/host/tests/devtest/recv_stability_test.py new file mode 100644 index 000000000..41707c7d3 --- /dev/null +++ b/host/tests/devtest/recv_stability_test.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +# +# Copyright 2021 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" Test recv, including some corner cases. """ + +import sys +import time +import argparse +import numpy as np +import uhd +try: + from ruamel import yaml +except: + import yaml + + +def run_recv_stability_test(usrp, device_config): + """ + Run tests related to recv(). + """ + num_chans = device_config.get('num_channels', usrp.get_rx_num_channels()) + str_args = uhd.usrp.StreamArgs( + device_config.get('cpu_type', 'fc32'), + device_config.get('cpu_type', 'sc16'), + ) + str_args.channels = list(range(num_chans)) + rx_streamer = usrp.get_rx_stream(str_args) + usrp.set_rx_rate(device_config.get('rate', 1e6)) + # Run tests + run_choke_test(usrp, rx_streamer, device_config) + return True + +def run_choke_test(usrp, rx_streamer, device_config): + """ + This will kick off a continuous stream, then interrupt it, then resume it. + We verify that: + - We get an overrun message (metadata) + - The stream resumes. + """ + + bufsize = 100 * rx_streamer.get_max_num_samps() + recv_buf = np.zeros( + (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64) + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) + stream_cmd.stream_now = False + init_delay = device_config.get('init_delay', 1.0) + stream_cmd.time_spec = usrp.get_time_now() + init_delay + rx_streamer.issue_stream_cmd(stream_cmd) + metadata = uhd.types.RXMetadata() + long_timeout = init_delay + 1.0 + num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout) + test_pass = True + # Large timeout, small rate: This should not fail + if num_samps_recvd != bufsize: + test_pass = False + print(f"run_choke_test(): First buffer recv() failed, rx'd " + f"{num_samps_recvd}/{bufsize} samples!") + if metadata.error_code != uhd.types.RXMetadataErrorCode.none: + test_pass = False + print( + f"run_choke_test(): First buffer recv() failed, rx'd " + f"error code: {metadata.strerror()}") + print("Choking RX...") + time.sleep(1) + print("Now, collecting the overrun (you should see an 'O' now).") + # On one of the next recv(), we should get an overrun. It may take a bit, + # because there will already be data in the pipe and depending on the USRP, + # overruns will be inline. + max_num_samps_before_o = device_config.get('max_num_samps_before_o', 1000000) + num_samps_recvd = 0 + overrun_recvd = False + while num_samps_recvd < max_num_samps_before_o: + num_samps_recvd += rx_streamer.recv(recv_buf, metadata, timeout=.1) + if metadata.error_code == uhd.types.RXMetadataErrorCode.overflow: + overrun_recvd = True + break + if metadata.error_code != uhd.types.RXMetadataErrorCode.none: + test_pass = False + print( + f"run_choke_test(): Second buffer recv() failed, rx'd " + f"error code: {metadata.strerror()} (should have been overflow or none)") + if not overrun_recvd: + test_pass = False + print( + f"run_choke_test(): Second buffer recv() failed, never rx'd " + f"an overflow") + # It should recover now: + num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout) + if num_samps_recvd != bufsize: + test_pass = False + print( + f"run_choke_test(): Third buffer recv() failed, rx'd " + f"{num_samps_recvd}/{bufsize} samples!") + if metadata.error_code != uhd.types.RXMetadataErrorCode.none: + test_pass = False + print( + f"run_choke_test(): Third buffer recv() failed, rx'd " + f"error code: {metadata.strerror()}") + stop_and_flush(rx_streamer) + if not test_pass: + raise RuntimeError("run_choke_test(): Test failed.") + +def stop_and_flush(rx_streamer): + """ + Utility to stop a streamer and clear the FIFO. + """ + print("Flushing FIFOs...") + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) + bufsize = rx_streamer.get_max_num_samps() + recv_buf = np.zeros( + (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64) + rx_streamer.issue_stream_cmd(stream_cmd) + metadata = uhd.types.RXMetadata() + stop_time = time.monotonic() + 5.0 + while time.monotonic() < stop_time: + rx_streamer.recv(recv_buf, metadata, timeout=0) + if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: + break + +def parse_args(): + """ + Parse args. + """ + parser = argparse.ArgumentParser() + parser.add_argument( + '--args', default='', + ) + parser.add_argument( + '--dump-defaults', + help="Specify a device type, and the default config will be dumped as YAML" + ) + parser.add_argument( + '--device-config', + help="Specify path to YAML file to use as device config" + ) + return parser.parse_args() + +def get_device_config(usrp_type, device_config_path=None): + """ + Return a device configuration object. + """ + if device_config_path: + with open(device_config_path, 'r') as yaml_f: + return yaml.load(yaml_f) + return {} + +def dump_defaults(usrp_type): + """ + Print the hard-coded defaults as YAML + """ + defaults = get_device_config(usrp_type) + print(yaml.dump(defaults, default_flow_style=False)) + +def main(): + """ + Returns True on Success + """ + args = parse_args() + if args.dump_defaults: + dump_defaults(args.dump_defaults) + return 0 + usrp = uhd.usrp.MultiUSRP(args.args) + usrp_type = usrp.get_usrp_rx_info().get('mboard_id') + device_config = get_device_config(usrp_type, args.device_config) + ret_val = run_recv_stability_test(usrp, device_config) + if ret_val != 1: + raise Exception("Python API Tester Received Errors") + return ret_val + +if __name__ == "__main__": + sys.exit(not main()) -- cgit v1.2.3