diff options
-rwxr-xr-x | tools/gr-usrptest/apps/rx_settling_time.py | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/tools/gr-usrptest/apps/rx_settling_time.py b/tools/gr-usrptest/apps/rx_settling_time.py new file mode 100755 index 000000000..d58eeb9dc --- /dev/null +++ b/tools/gr-usrptest/apps/rx_settling_time.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +RX samples and apply settings as a timed command. Use this tool to analyze the +settling time of analog components as well as the accuracy of timed commands. +Typically, you will need to connect a tone or other signal generator to the +DUT's input. + +Example: This would receive several seconds of data from an X3x0 device, +tune to 1 GHz, and then bump the gain by 30 dB after a set amount of +time: + +$ rx_settling_time.py -a type=x300 -f 1e9 -g 0 --new-gain 30 --plot +""" + +from __future__ import print_function +import argparse +import numpy as np +from six import iteritems +import uhd + + +def parse_args(): + """Parse the command line arguments""" + parser = argparse.ArgumentParser( + description=__doc__ + ) + parser.add_argument( + "-a", "--args", default="", + help="Device args (e.g., 'type=x300')") + parser.add_argument( + "--spec", + help="Subdev spec (e.g. 'B:0')") + parser.add_argument( + "-d", "--duration", default=5.0, type=float, + help="Total acquisition time") + parser.add_argument( + "--setup-delay", default=.5, type=float, + help="Time before starting receive") + parser.add_argument( + "--set-delay", default=2.0, type=float, + help="Time between starting to receive and changing settings") + parser.add_argument( + "--skip-time", default=0.0, type=float, + help="Time to skip after starting to receive") + parser.add_argument( + "-o", "--output-file", type=str, + help="Name of the output file (e.g., 'output.dat')") + parser.add_argument( + "-f", "--freq", type=float, required=True, + help="Initial frequency") + parser.add_argument( + "-g", "--gain", type=float, default=20.0, + help="Initial gain") + parser.add_argument( + "--new-freq", type=float, + help="Frequency after set time") + parser.add_argument( + "--new-gain", + help="Gain after set time") + parser.add_argument( + "-r", "--rate", default=1e6, type=float, + help="Sampling rate (Hz)") + parser.add_argument( + "-c", "--channel", default=0, type=int, help="Channel on which to receive on") + parser.add_argument( + "-n", "--numpy", default=False, action="store_true", + help="Save output file in NumPy format (default: No)") + parser.add_argument( + "--plot", default=False, action="store_true", + help="Show nice pic") + return parser.parse_args() + + + +def get_rx_streamer(usrp, chan): + """ + Return a streamer + """ + st_args = uhd.usrp.StreamArgs("fc32", "sc16") + st_args.channels = [chan,] + return usrp.get_rx_stream(st_args) + + +def apply_initial_settings(usrp, chan, rate, freq, gain): + """ + Apply initial settings for: + - freq + - gain + - rate + """ + usrp.set_rx_rate(rate) + tune_req = uhd.types.TuneRequest(freq) + usrp.set_rx_freq(tune_req, chan) + usrp.set_rx_gain(gain, chan) + + +def start_rx_stream(streamer, start_time): + """ + Kick off the RX streamer + """ + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) + stream_cmd.stream_now = False + stream_cmd.time_spec = start_time + streamer.issue_stream_cmd(stream_cmd) + + +def load_commands(usrp, chan, cmd_time, **kwargs): + """ + Load the switching commands. + """ + usrp.set_command_time(cmd_time) + kw_cb_map = { + 'freq': lambda freq: usrp.set_rx_freq(uhd.types.TuneRequest(float(freq)), chan), + 'gain': lambda gain: usrp.set_rx_gain(float(gain), chan), + } + for key, callback in iteritems(kw_cb_map): + if kwargs.get(key) is not None: + callback(kwargs[key]) + usrp.clear_command_time() + + +def recv_samples(rx_streamer, total_num_samps, skip_samples): + """ + Run the receive loop and crop samples. + """ + metadata = uhd.types.RXMetadata() + result = np.empty((1, total_num_samps), dtype=np.complex64) + total_samps_recvd = 0 + timeouts = 0 # This is a bit of a hack, until we can pass timeout values to + # Python + max_timeouts = 20 + buffer_samps = rx_streamer.get_max_num_samps() + recv_buffer = np.zeros( + (1, buffer_samps), dtype=np.complex64) + while total_samps_recvd < total_num_samps: + samps_recvd = rx_streamer.recv(recv_buffer, metadata) + if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: + timeouts += 1 + if timeouts >= max_timeouts: + print("[ERROR] Reached timeout threshold. Exiting.") + return None + elif metadata.error_code != uhd.types.RXMetadataErrorCode.none: + print("[ERROR] " + metadata.strerror()) + return None + if samps_recvd: + samps_recvd = min(total_num_samps - total_samps_recvd, samps_recvd) + result[:, total_samps_recvd:total_samps_recvd + samps_recvd] = \ + recv_buffer[:, 0:samps_recvd] + total_samps_recvd += samps_recvd + if skip_samples: + print("Skipping {} samples.".format(skip_samples)) + return result[0][skip_samples:] + + +def save_to_file(samps, filename, save_as_numpy): + """ + Save samples to binary file + """ + with open(filename, 'wb') as out_file: + if save_as_numpy: + np.save(out_file, samps, allow_pickle=False, fix_imports=False) + else: + samps.tofile(out_file) + +def plot_samps(samps, rate, set_offset): + """ + Show a nice piccie + """ + try: + import pylab + except ImportError: + print("[ERROR] --plot requires pylab.") + return + ylim = max( + max(np.abs(np.real(samps))), + max(np.abs(np.imag(samps))), + ) + time_axis = np.arange(len(samps)) / rate - set_offset + pylab.plot(time_axis, np.real(samps)) + pylab.plot(time_axis, np.imag(samps)) + pylab.ylim((-ylim, ylim)) + pylab.grid(True) + pylab.xlabel('Time offset [s]') + pylab.ylabel('Amplitude') + pylab.legend(('In-Phase', 'Quadrature')) + pylab.title('Settling Time') + pylab.show() + + +def main(): + """Execute""" + args = parse_args() + usrp = uhd.usrp.MultiUSRP(args.args) + if args.spec is not None: + usrp.set_rx_subdev_spec(uhd.usrp.SubdevSpec(args.spec)) + rx_streamer = get_rx_streamer(usrp, args.channel) + total_num_samps = int(args.duration * args.rate) + skip_samps = int(args.skip_time * args.rate) + print("Total number of samples to acquire: {}".format(total_num_samps)) + apply_initial_settings( + usrp, + args.channel, + args.rate, + args.freq, + args.gain + ) + time_zero = usrp.get_time_now() + print("Sending stream commands...") + start_rx_stream( + rx_streamer, + time_zero+args.setup_delay, + ) + print("Preloading set commands...") + load_commands( + usrp=usrp, + chan=args.channel, + cmd_time=time_zero+args.setup_delay+args.set_delay, + freq=args.new_freq, + gain=args.new_gain, + ) + print("Starting receive...") + samps = recv_samples(rx_streamer, total_num_samps, skip_samps) + if samps is None: + return False + print("Received {} samples.".format(samps.size)) + print("New settings are applied at sample index {}." + .format(int((args.set_delay - args.skip_time) * args.rate))) + if args.plot: + plot_samps( + samps, + args.rate, + args.set_delay - args.skip_time, + ) + if args.output_file: + save_to_file( + samps, + args.output_file, + args.numpy, + ) + return True + +if __name__ == "__main__": + exit(not main()) + |