aboutsummaryrefslogtreecommitdiffstats
path: root/host/tests/devtest/recv_stability_test.py
blob: 41707c7d3f0238ea213371376700d4d7a328da3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3
#
# Copyright 2021 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
""" Test recv, including some corner cases. """

import sys
import time
import argparse
import numpy as np
import uhd
try:
    from ruamel import yaml
except:
    import yaml


def run_recv_stability_test(usrp, device_config):
    """
    Run tests related to recv().
    """
    num_chans = device_config.get('num_channels', usrp.get_rx_num_channels())
    str_args = uhd.usrp.StreamArgs(
        device_config.get('cpu_type', 'fc32'),
        device_config.get('cpu_type', 'sc16'),
    )
    str_args.channels = list(range(num_chans))
    rx_streamer = usrp.get_rx_stream(str_args)
    usrp.set_rx_rate(device_config.get('rate', 1e6))
    # Run tests
    run_choke_test(usrp, rx_streamer, device_config)
    return True

def run_choke_test(usrp, rx_streamer, device_config):
    """
    This will kick off a continuous stream, then interrupt it, then resume it.
    We verify that:
    - We get an overrun message (metadata)
    - The stream resumes.
    """

    bufsize = 100 * rx_streamer.get_max_num_samps()
    recv_buf = np.zeros(
        (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
    stream_cmd.stream_now = False
    init_delay = device_config.get('init_delay', 1.0)
    stream_cmd.time_spec = usrp.get_time_now() + init_delay
    rx_streamer.issue_stream_cmd(stream_cmd)
    metadata = uhd.types.RXMetadata()
    long_timeout = init_delay + 1.0
    num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
    test_pass = True
    # Large timeout, small rate: This should not fail
    if num_samps_recvd != bufsize:
        test_pass = False
        print(f"run_choke_test(): First buffer recv() failed, rx'd "
              f"{num_samps_recvd}/{bufsize} samples!")
    if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
        test_pass = False
        print(
            f"run_choke_test(): First buffer recv() failed, rx'd "
            f"error code: {metadata.strerror()}")
    print("Choking RX...")
    time.sleep(1)
    print("Now, collecting the overrun (you should see an 'O' now).")
    # On one of the next recv(), we should get an overrun. It may take a bit,
    # because there will already be data in the pipe and depending on the USRP,
    # overruns will be inline.
    max_num_samps_before_o = device_config.get('max_num_samps_before_o', 1000000)
    num_samps_recvd = 0
    overrun_recvd = False
    while num_samps_recvd < max_num_samps_before_o:
        num_samps_recvd += rx_streamer.recv(recv_buf, metadata, timeout=.1)
        if metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
            overrun_recvd = True
            break
        if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
            test_pass = False
            print(
                f"run_choke_test(): Second buffer recv() failed, rx'd "
                f"error code: {metadata.strerror()} (should have been overflow or none)")
    if not overrun_recvd:
        test_pass = False
        print(
            f"run_choke_test(): Second buffer recv() failed, never rx'd "
            f"an overflow")
    # It should recover now:
    num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
    if num_samps_recvd != bufsize:
        test_pass = False
        print(
            f"run_choke_test(): Third buffer recv() failed, rx'd "
            f"{num_samps_recvd}/{bufsize} samples!")
    if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
        test_pass = False
        print(
            f"run_choke_test(): Third buffer recv() failed, rx'd "
            f"error code: {metadata.strerror()}")
    stop_and_flush(rx_streamer)
    if not test_pass:
        raise RuntimeError("run_choke_test(): Test failed.")

def stop_and_flush(rx_streamer):
    """
    Utility to stop a streamer and clear the FIFO.
    """
    print("Flushing FIFOs...")
    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
    bufsize = rx_streamer.get_max_num_samps()
    recv_buf = np.zeros(
        (rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
    rx_streamer.issue_stream_cmd(stream_cmd)
    metadata = uhd.types.RXMetadata()
    stop_time = time.monotonic() + 5.0
    while time.monotonic() < stop_time:
        rx_streamer.recv(recv_buf, metadata, timeout=0)
        if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
            break

def parse_args():
    """
    Parse args.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--args', default='',
    )
    parser.add_argument(
        '--dump-defaults',
        help="Specify a device type, and the default config will be dumped as YAML"
    )
    parser.add_argument(
        '--device-config',
        help="Specify path to YAML file to use as device config"
    )
    return parser.parse_args()

def get_device_config(usrp_type, device_config_path=None):
    """
    Return a device configuration object.
    """
    if device_config_path:
        with open(device_config_path, 'r') as yaml_f:
            return yaml.load(yaml_f)
    return {}

def dump_defaults(usrp_type):
    """
    Print the hard-coded defaults as YAML
    """
    defaults = get_device_config(usrp_type)
    print(yaml.dump(defaults, default_flow_style=False))

def main():
    """
    Returns True on Success
    """
    args = parse_args()
    if args.dump_defaults:
        dump_defaults(args.dump_defaults)
        return 0
    usrp = uhd.usrp.MultiUSRP(args.args)
    usrp_type = usrp.get_usrp_rx_info().get('mboard_id')
    device_config = get_device_config(usrp_type, args.device_config)
    ret_val = run_recv_stability_test(usrp, device_config)
    if ret_val != 1:
        raise Exception("Python API Tester Received Errors")
    return ret_val

if __name__ == "__main__":
    sys.exit(not main())