1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
#!/usr/bin/env python3
#
# Copyright 2021 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
""" Test recv, including some corner cases. """
import sys
import time
import argparse
import numpy as np
import uhd
try:
from ruamel import yaml
except:
import yaml
def run_recv_stability_test(usrp, device_config):
"""
Run tests related to recv().
"""
num_chans = device_config.get('num_channels', usrp.get_rx_num_channels())
str_args = uhd.usrp.StreamArgs(
device_config.get('cpu_type', 'fc32'),
device_config.get('cpu_type', 'sc16'),
)
str_args.channels = list(range(num_chans))
rx_streamer = usrp.get_rx_stream(str_args)
usrp.set_rx_rate(device_config.get('rate', 1e6))
# Run tests
run_choke_test(usrp, rx_streamer, device_config)
return True
def run_choke_test(usrp, rx_streamer, device_config):
"""
This will kick off a continuous stream, then interrupt it, then resume it.
We verify that:
- We get an overrun message (metadata)
- The stream resumes.
"""
bufsize = 100 * rx_streamer.get_max_num_samps()
recv_buf = np.zeros(
(rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
stream_cmd.stream_now = False
init_delay = device_config.get('init_delay', 1.0)
stream_cmd.time_spec = usrp.get_time_now() + init_delay
rx_streamer.issue_stream_cmd(stream_cmd)
metadata = uhd.types.RXMetadata()
long_timeout = init_delay + 1.0
num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
test_pass = True
# Large timeout, small rate: This should not fail
if num_samps_recvd != bufsize:
test_pass = False
print(f"run_choke_test(): First buffer recv() failed, rx'd "
f"{num_samps_recvd}/{bufsize} samples!")
if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
test_pass = False
print(
f"run_choke_test(): First buffer recv() failed, rx'd "
f"error code: {metadata.strerror()}")
print("Choking RX...")
time.sleep(1)
print("Now, collecting the overrun (you should see an 'O' now).")
# On one of the next recv(), we should get an overrun. It may take a bit,
# because there will already be data in the pipe and depending on the USRP,
# overruns will be inline.
max_num_samps_before_o = device_config.get('max_num_samps_before_o', 1000000)
num_samps_recvd = 0
overrun_recvd = False
while num_samps_recvd < max_num_samps_before_o:
num_samps_recvd += rx_streamer.recv(recv_buf, metadata, timeout=.1)
if metadata.error_code == uhd.types.RXMetadataErrorCode.overflow:
overrun_recvd = True
break
if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
test_pass = False
print(
f"run_choke_test(): Second buffer recv() failed, rx'd "
f"error code: {metadata.strerror()} (should have been overflow or none)")
if not overrun_recvd:
test_pass = False
print(
f"run_choke_test(): Second buffer recv() failed, never rx'd "
f"an overflow")
# It should recover now:
num_samps_recvd = rx_streamer.recv(recv_buf, metadata, timeout=long_timeout)
if num_samps_recvd != bufsize:
test_pass = False
print(
f"run_choke_test(): Third buffer recv() failed, rx'd "
f"{num_samps_recvd}/{bufsize} samples!")
if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
test_pass = False
print(
f"run_choke_test(): Third buffer recv() failed, rx'd "
f"error code: {metadata.strerror()}")
stop_and_flush(rx_streamer)
if not test_pass:
raise RuntimeError("run_choke_test(): Test failed.")
def stop_and_flush(rx_streamer):
"""
Utility to stop a streamer and clear the FIFO.
"""
print("Flushing FIFOs...")
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
bufsize = rx_streamer.get_max_num_samps()
recv_buf = np.zeros(
(rx_streamer.get_num_channels(), bufsize), dtype=np.complex64)
rx_streamer.issue_stream_cmd(stream_cmd)
metadata = uhd.types.RXMetadata()
stop_time = time.monotonic() + 5.0
while time.monotonic() < stop_time:
rx_streamer.recv(recv_buf, metadata, timeout=0)
if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
break
def parse_args():
"""
Parse args.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--args', default='',
)
parser.add_argument(
'--dump-defaults',
help="Specify a device type, and the default config will be dumped as YAML"
)
parser.add_argument(
'--device-config',
help="Specify path to YAML file to use as device config"
)
return parser.parse_args()
def get_device_config(usrp_type, device_config_path=None):
"""
Return a device configuration object.
"""
if device_config_path:
with open(device_config_path, 'r') as yaml_f:
return yaml.load(yaml_f)
return {}
def dump_defaults(usrp_type):
"""
Print the hard-coded defaults as YAML
"""
defaults = get_device_config(usrp_type)
print(yaml.dump(defaults, default_flow_style=False))
def main():
"""
Returns True on Success
"""
args = parse_args()
if args.dump_defaults:
dump_defaults(args.dump_defaults)
return 0
usrp = uhd.usrp.MultiUSRP(args.args)
usrp_type = usrp.get_usrp_rx_info().get('mboard_id')
device_config = get_device_config(usrp_type, args.device_config)
ret_val = run_recv_stability_test(usrp, device_config)
if ret_val != 1:
raise Exception("Python API Tester Received Errors")
return ret_val
if __name__ == "__main__":
sys.exit(not main())
|