aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
Diffstat (limited to 'host')
-rw-r--r--host/tests/devtest/recv_stability_test.py174
1 files changed, 174 insertions, 0 deletions
diff --git a/host/tests/devtest/recv_stability_test.py b/host/tests/devtest/recv_stability_test.py
new file mode 100644
index 000000000..41707c7d3
--- /dev/null
+++ b/host/tests/devtest/recv_stability_test.py
@@ -0,0 +1,174 @@
+#!/usr/bin/env python3
+#
+# Copyright 2021 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+""" Test recv, including some corner cases. """
+
+import sys
+import time
+import argparse
+import numpy as np
+import uhd
+try:
+ from ruamel import yaml
+except:
+ import yaml
+
+
+def run_recv_stability_test(usrp, device_config):
+ """
+ Run tests related to recv().
+ """
+ num_chans = device_config.get('num_channels', usrp.get_rx_num_channels())
+ str_args = uhd.usrp.StreamArgs(
+ device_config.get('cpu_type', 'fc32'),
+ device_config.get('cpu_type', 'sc16'),
+ )
+ str_args.channels = list(range(num_chans))
+ rx_streamer = usrp.get_rx_stream(str_args)
+ usrp.set_rx_rate(device_config.get('rate', 1e6))
+ # Run tests
+ run_choke_test(usrp, rx_streamer, device_config)
+ return True
+
+def run_choke_test(usrp, rx_streamer, device_config):
+ """
+ This will kick off a continuous stream, then interrupt it, then resume it.
+ We verify that:
+ - We get an overrun message (metadata)
+ - The stream resumes.
+ """
+
+ bufsize = 100 * rx_streamer.get_max_num_samps()
+ recv_buf = np.zeros(
+ (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
+ stream_cmd.stream_now = False
+ init_delay = device_config.get('init_delay', 1.0)
+ stream_cmd.time_spec = usrp.get_time_now() + init_delay
+ rx_streamer.issue_stream_cmd(stream_cmd)
+ metadata = uhd.types.RXMetadata()
+ long_timeout = init_delay + 1.0
+ num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
+ test_pass = True
+ # Large timeout, small rate: This should not fail
+ if num_samps_recvd != bufsize:
+ test_pass = False
+ print(f"run_choke_test(): First buffer recv() failed, rx'd "
+ f"{num_samps_recvd}/{bufsize} samples!")
+ if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
+ test_pass = False
+ print(
+ f"run_choke_test(): First buffer recv() failed, rx'd "
+ f"error code: {metadata.strerror()}")
+ print("Choking RX...")
+ time.sleep(1)
+ print("Now, collecting the overrun (you should see an 'O' now).")
+ # On one of the next recv(), we should get an overrun. It may take a bit,
+ # because there will already be data in the pipe and depending on the USRP,
+ # overruns will be inline.
+ max_num_samps_before_o = device_config.get('max_num_samps_before_o', 1000000)
+ num_samps_recvd = 0
+ overrun_recvd = False
+ while num_samps_recvd < max_num_samps_before_o:
+ num_samps_recvd += rx_streamer.recv(recv_buf, metadata, timeout=.1)
+ if metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
+ overrun_recvd = True
+ break
+ if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
+ test_pass = False
+ print(
+ f"run_choke_test(): Second buffer recv() failed, rx'd "
+ f"error code: {metadata.strerror()} (should have been overflow or none)")
+ if not overrun_recvd:
+ test_pass = False
+ print(
+ f"run_choke_test(): Second buffer recv() failed, never rx'd "
+ f"an overflow")
+ # It should recover now:
+ num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
+ if num_samps_recvd != bufsize:
+ test_pass = False
+ print(
+ f"run_choke_test(): Third buffer recv() failed, rx'd "
+ f"{num_samps_recvd}/{bufsize} samples!")
+ if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
+ test_pass = False
+ print(
+ f"run_choke_test(): Third buffer recv() failed, rx'd "
+ f"error code: {metadata.strerror()}")
+ stop_and_flush(rx_streamer)
+ if not test_pass:
+ raise RuntimeError("run_choke_test(): Test failed.")
+
+def stop_and_flush(rx_streamer):
+ """
+ Utility to stop a streamer and clear the FIFO.
+ """
+ print("Flushing FIFOs...")
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
+ bufsize = rx_streamer.get_max_num_samps()
+ recv_buf = np.zeros(
+ (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
+ rx_streamer.issue_stream_cmd(stream_cmd)
+ metadata = uhd.types.RXMetadata()
+ stop_time = time.monotonic() + 5.0
+ while time.monotonic() < stop_time:
+ rx_streamer.recv(recv_buf, metadata, timeout=0)
+ if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
+ break
+
+def parse_args():
+ """
+ Parse args.
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--args', default='',
+ )
+ parser.add_argument(
+ '--dump-defaults',
+ help="Specify a device type, and the default config will be dumped as YAML"
+ )
+ parser.add_argument(
+ '--device-config',
+ help="Specify path to YAML file to use as device config"
+ )
+ return parser.parse_args()
+
+def get_device_config(usrp_type, device_config_path=None):
+ """
+ Return a device configuration object.
+ """
+ if device_config_path:
+ with open(device_config_path, 'r') as yaml_f:
+ return yaml.load(yaml_f)
+ return {}
+
+def dump_defaults(usrp_type):
+ """
+ Print the hard-coded defaults as YAML
+ """
+ defaults = get_device_config(usrp_type)
+ print(yaml.dump(defaults, default_flow_style=False))
+
+def main():
+ """
+ Returns True on Success
+ """
+ args = parse_args()
+ if args.dump_defaults:
+ dump_defaults(args.dump_defaults)
+ return 0
+ usrp = uhd.usrp.MultiUSRP(args.args)
+ usrp_type = usrp.get_usrp_rx_info().get('mboard_id')
+ device_config = get_device_config(usrp_type, args.device_config)
+ ret_val = run_recv_stability_test(usrp, device_config)
+ if ret_val != 1:
+ raise Exception("Python API Tester Received Errors")
+ return ret_val
+
+if __name__ == "__main__":
+ sys.exit(not main())