aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2021-09-21 22:15:02 +0200
committerAaron Rossetto <aaron.rossetto@ni.com>2021-09-28 06:24:05 -0700
commitd0e75f66fabacd2bc97981df6b0525b196a86fc5 (patch)
treebb373b42f4d601d4b470060c3fcfa1056c4467e2
parent8111b77f74ec9547a6d63f7b36124eeb599153e3 (diff)
downloaduhd-d0e75f66fabacd2bc97981df6b0525b196a86fc5.tar.gz
uhd-d0e75f66fabacd2bc97981df6b0525b196a86fc5.tar.bz2
uhd-d0e75f66fabacd2bc97981df6b0525b196a86fc5.zip
devtest: Add receive stability test
At this point, this test chokes an RX streamer to force an overrun. It then confirms that the overrun message is returned to the call site, and that the streamer returns to continuous streaming after the overrun handling.
-rw-r--r--host/tests/devtest/recv_stability_test.py174
1 files changed, 174 insertions, 0 deletions
diff --git a/host/tests/devtest/recv_stability_test.py b/host/tests/devtest/recv_stability_test.py
new file mode 100644
index 000000000..41707c7d3
--- /dev/null
+++ b/host/tests/devtest/recv_stability_test.py
@@ -0,0 +1,174 @@
+#!/usr/bin/env python3
+#
+# Copyright 2021 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+""" Test recv, including some corner cases. """
+
+import sys
+import time
+import argparse
+import numpy as np
+import uhd
+try:
+ from ruamel import yaml
+except:
+ import yaml
+
+
+def run_recv_stability_test(usrp, device_config):
+ """
+ Run tests related to recv().
+ """
+ num_chans = device_config.get('num_channels', usrp.get_rx_num_channels())
+ str_args = uhd.usrp.StreamArgs(
+ device_config.get('cpu_type', 'fc32'),
+ device_config.get('cpu_type', 'sc16'),
+ )
+ str_args.channels = list(range(num_chans))
+ rx_streamer = usrp.get_rx_stream(str_args)
+ usrp.set_rx_rate(device_config.get('rate', 1e6))
+ # Run tests
+ run_choke_test(usrp, rx_streamer, device_config)
+ return True
+
+def run_choke_test(usrp, rx_streamer, device_config):
+ """
+ This will kick off a continuous stream, then interrupt it, then resume it.
+ We verify that:
+ - We get an overrun message (metadata)
+ - The stream resumes.
+ """
+
+ bufsize = 100 * rx_streamer.get_max_num_samps()
+ recv_buf = np.zeros(
+ (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
+ stream_cmd.stream_now = False
+ init_delay = device_config.get('init_delay', 1.0)
+ stream_cmd.time_spec = usrp.get_time_now() + init_delay
+ rx_streamer.issue_stream_cmd(stream_cmd)
+ metadata = uhd.types.RXMetadata()
+ long_timeout = init_delay + 1.0
+ num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
+ test_pass = True
+ # Large timeout, small rate: This should not fail
+ if num_samps_recvd != bufsize:
+ test_pass = False
+ print(f"run_choke_test(): First buffer recv() failed, rx'd "
+ f"{num_samps_recvd}/{bufsize} samples!")
+ if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
+ test_pass = False
+ print(
+ f"run_choke_test(): First buffer recv() failed, rx'd "
+ f"error code: {metadata.strerror()}")
+ print("Choking RX...")
+ time.sleep(1)
+ print("Now, collecting the overrun (you should see an 'O' now).")
+ # On one of the next recv(), we should get an overrun. It may take a bit,
+ # because there will already be data in the pipe and depending on the USRP,
+ # overruns will be inline.
+ max_num_samps_before_o = device_config.get('max_num_samps_before_o', 1000000)
+ num_samps_recvd = 0
+ overrun_recvd = False
+ while num_samps_recvd < max_num_samps_before_o:
+ num_samps_recvd += rx_streamer.recv(recv_buf, metadata, timeout=.1)
+ if metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
+ overrun_recvd = True
+ break
+ if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
+ test_pass = False
+ print(
+ f"run_choke_test(): Second buffer recv() failed, rx'd "
+ f"error code: {metadata.strerror()} (should have been overflow or none)")
+ if not overrun_recvd:
+ test_pass = False
+ print(
+ f"run_choke_test(): Second buffer recv() failed, never rx'd "
+ f"an overflow")
+ # It should recover now:
+ num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
+ if num_samps_recvd != bufsize:
+ test_pass = False
+ print(
+ f"run_choke_test(): Third buffer recv() failed, rx'd "
+ f"{num_samps_recvd}/{bufsize} samples!")
+ if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
+ test_pass = False
+ print(
+ f"run_choke_test(): Third buffer recv() failed, rx'd "
+ f"error code: {metadata.strerror()}")
+ stop_and_flush(rx_streamer)
+ if not test_pass:
+ raise RuntimeError("run_choke_test(): Test failed.")
+
+def stop_and_flush(rx_streamer):
+ """
+ Utility to stop a streamer and clear the FIFO.
+ """
+ print("Flushing FIFOs...")
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
+ bufsize = rx_streamer.get_max_num_samps()
+ recv_buf = np.zeros(
+ (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
+ rx_streamer.issue_stream_cmd(stream_cmd)
+ metadata = uhd.types.RXMetadata()
+ stop_time = time.monotonic() + 5.0
+ while time.monotonic() < stop_time:
+ rx_streamer.recv(recv_buf, metadata, timeout=0)
+ if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
+ break
+
+def parse_args():
+ """
+ Parse args.
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--args', default='',
+ )
+ parser.add_argument(
+ '--dump-defaults',
+ help="Specify a device type, and the default config will be dumped as YAML"
+ )
+ parser.add_argument(
+ '--device-config',
+ help="Specify path to YAML file to use as device config"
+ )
+ return parser.parse_args()
+
+def get_device_config(usrp_type, device_config_path=None):
+ """
+ Return a device configuration object.
+ """
+ if device_config_path:
+ with open(device_config_path, 'r') as yaml_f:
+ return yaml.load(yaml_f)
+ return {}
+
+def dump_defaults(usrp_type):
+ """
+ Print the hard-coded defaults as YAML
+ """
+ defaults = get_device_config(usrp_type)
+ print(yaml.dump(defaults, default_flow_style=False))
+
+def main():
+ """
+ Returns True on Success
+ """
+ args = parse_args()
+ if args.dump_defaults:
+ dump_defaults(args.dump_defaults)
+ return 0
+ usrp = uhd.usrp.MultiUSRP(args.args)
+ usrp_type = usrp.get_usrp_rx_info().get('mboard_id')
+ device_config = get_device_config(usrp_type, args.device_config)
+ ret_val = run_recv_stability_test(usrp, device_config)
+ if ret_val != 1:
+ raise Exception("Python API Tester Received Errors")
+ return ret_val
+
+if __name__ == "__main__":
+ sys.exit(not main())