aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2018-09-06 17:23:24 -0700
committerMartin Braun <martin.braun@ettus.com>2018-10-11 13:19:22 -0700
commitbf3d9ac2f4574feccc6578bc3eb743e7be07f50f (patch)
tree22a18f3695041c378b6b93fc22727741842385bf
parent3fd10e971a6366600ca2627f51ecd2302bcb35d8 (diff)
downloaduhd-bf3d9ac2f4574feccc6578bc3eb743e7be07f50f.tar.gz
uhd-bf3d9ac2f4574feccc6578bc3eb743e7be07f50f.tar.bz2
uhd-bf3d9ac2f4574feccc6578bc3eb743e7be07f50f.zip
tools: Add tool to analyze settling time of gain of freq changes
This tool uses the Python API to acquire a snapshot of samples during a gain or frequency change. It can be used to analyze the settling time of analog components, as well as the accuracy in time. It has two combinable ways of analyzing the data: 1) Write it to a file, or 2) plot the time-domain data. Example: This would receive several seconds of data from an X3x0 device, tune to 1 GHz, and then bump the gain by 30 dB after a set amount of time: $ rx_settling_time.py -a type=x300 -f 1e9 -g 0 --new-gain 30 --plot
-rwxr-xr-xtools/gr-usrptest/apps/rx_settling_time.py249
1 files changed, 249 insertions, 0 deletions
diff --git a/tools/gr-usrptest/apps/rx_settling_time.py b/tools/gr-usrptest/apps/rx_settling_time.py
new file mode 100755
index 000000000..d58eeb9dc
--- /dev/null
+++ b/tools/gr-usrptest/apps/rx_settling_time.py
@@ -0,0 +1,249 @@
+#!/usr/bin/env python
+#
+# Copyright 2018 Ettus Research, a National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+RX samples and apply settings as a timed command. Use this tool to analyze the
+settling time of analog components as well as the accuracy of timed commands.
+Typically, you will need to connect a tone or other signal generator to the
+DUT's input.
+
+Example: This would receive several seconds of data from an X3x0 device,
+tune to 1 GHz, and then bump the gain by 30 dB after a set amount of
+time:
+
+$ rx_settling_time.py -a type=x300 -f 1e9 -g 0 --new-gain 30 --plot
+"""
+
+from __future__ import print_function
+import argparse
+import numpy as np
+from six import iteritems
+import uhd
+
+
+def parse_args():
+ """Parse the command line arguments"""
+ parser = argparse.ArgumentParser(
+ description=__doc__
+ )
+ parser.add_argument(
+ "-a", "--args", default="",
+ help="Device args (e.g., 'type=x300')")
+ parser.add_argument(
+ "--spec",
+ help="Subdev spec (e.g. 'B:0')")
+ parser.add_argument(
+ "-d", "--duration", default=5.0, type=float,
+ help="Total acquisition time")
+ parser.add_argument(
+ "--setup-delay", default=.5, type=float,
+ help="Time before starting receive")
+ parser.add_argument(
+ "--set-delay", default=2.0, type=float,
+ help="Time between starting to receive and changing settings")
+ parser.add_argument(
+ "--skip-time", default=0.0, type=float,
+ help="Time to skip after starting to receive")
+ parser.add_argument(
+ "-o", "--output-file", type=str,
+ help="Name of the output file (e.g., 'output.dat')")
+ parser.add_argument(
+ "-f", "--freq", type=float, required=True,
+ help="Initial frequency")
+ parser.add_argument(
+ "-g", "--gain", type=float, default=20.0,
+ help="Initial gain")
+ parser.add_argument(
+ "--new-freq", type=float,
+ help="Frequency after set time")
+ parser.add_argument(
+ "--new-gain",
+ help="Gain after set time")
+ parser.add_argument(
+ "-r", "--rate", default=1e6, type=float,
+ help="Sampling rate (Hz)")
+ parser.add_argument(
+ "-c", "--channel", default=0, type=int, help="Channel on which to receive on")
+ parser.add_argument(
+ "-n", "--numpy", default=False, action="store_true",
+ help="Save output file in NumPy format (default: No)")
+ parser.add_argument(
+ "--plot", default=False, action="store_true",
+ help="Show nice pic")
+ return parser.parse_args()
+
+
+
+def get_rx_streamer(usrp, chan):
+ """
+ Return a streamer
+ """
+ st_args = uhd.usrp.StreamArgs("fc32", "sc16")
+ st_args.channels = [chan,]
+ return usrp.get_rx_stream(st_args)
+
+
+def apply_initial_settings(usrp, chan, rate, freq, gain):
+ """
+ Apply initial settings for:
+ - freq
+ - gain
+ - rate
+ """
+ usrp.set_rx_rate(rate)
+ tune_req = uhd.types.TuneRequest(freq)
+ usrp.set_rx_freq(tune_req, chan)
+ usrp.set_rx_gain(gain, chan)
+
+
+def start_rx_stream(streamer, start_time):
+ """
+ Kick off the RX streamer
+ """
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
+ stream_cmd.stream_now = False
+ stream_cmd.time_spec = start_time
+ streamer.issue_stream_cmd(stream_cmd)
+
+
+def load_commands(usrp, chan, cmd_time, **kwargs):
+ """
+ Load the switching commands.
+ """
+ usrp.set_command_time(cmd_time)
+ kw_cb_map = {
+ 'freq': lambda freq: usrp.set_rx_freq(uhd.types.TuneRequest(float(freq)), chan),
+ 'gain': lambda gain: usrp.set_rx_gain(float(gain), chan),
+ }
+ for key, callback in iteritems(kw_cb_map):
+ if kwargs.get(key) is not None:
+ callback(kwargs[key])
+ usrp.clear_command_time()
+
+
+def recv_samples(rx_streamer, total_num_samps, skip_samples):
+ """
+ Run the receive loop and crop samples.
+ """
+ metadata = uhd.types.RXMetadata()
+ result = np.empty((1, total_num_samps), dtype=np.complex64)
+ total_samps_recvd = 0
+ timeouts = 0 # This is a bit of a hack, until we can pass timeout values to
+ # Python
+ max_timeouts = 20
+ buffer_samps = rx_streamer.get_max_num_samps()
+ recv_buffer = np.zeros(
+ (1, buffer_samps), dtype=np.complex64)
+ while total_samps_recvd < total_num_samps:
+ samps_recvd = rx_streamer.recv(recv_buffer, metadata)
+ if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout:
+ timeouts += 1
+ if timeouts >= max_timeouts:
+ print("[ERROR] Reached timeout threshold. Exiting.")
+ return None
+ elif metadata.error_code != uhd.types.RXMetadataErrorCode.none:
+ print("[ERROR] " + metadata.strerror())
+ return None
+ if samps_recvd:
+ samps_recvd = min(total_num_samps - total_samps_recvd, samps_recvd)
+ result[:, total_samps_recvd:total_samps_recvd + samps_recvd] = \
+ recv_buffer[:, 0:samps_recvd]
+ total_samps_recvd += samps_recvd
+ if skip_samples:
+ print("Skipping {} samples.".format(skip_samples))
+ return result[0][skip_samples:]
+
+
+def save_to_file(samps, filename, save_as_numpy):
+ """
+ Save samples to binary file
+ """
+ with open(filename, 'wb') as out_file:
+ if save_as_numpy:
+ np.save(out_file, samps, allow_pickle=False, fix_imports=False)
+ else:
+ samps.tofile(out_file)
+
+def plot_samps(samps, rate, set_offset):
+ """
+ Show a nice piccie
+ """
+ try:
+ import pylab
+ except ImportError:
+ print("[ERROR] --plot requires pylab.")
+ return
+ ylim = max(
+ max(np.abs(np.real(samps))),
+ max(np.abs(np.imag(samps))),
+ )
+ time_axis = np.arange(len(samps)) / rate - set_offset
+ pylab.plot(time_axis, np.real(samps))
+ pylab.plot(time_axis, np.imag(samps))
+ pylab.ylim((-ylim, ylim))
+ pylab.grid(True)
+ pylab.xlabel('Time offset [s]')
+ pylab.ylabel('Amplitude')
+ pylab.legend(('In-Phase', 'Quadrature'))
+ pylab.title('Settling Time')
+ pylab.show()
+
+
+def main():
+ """Execute"""
+ args = parse_args()
+ usrp = uhd.usrp.MultiUSRP(args.args)
+ if args.spec is not None:
+ usrp.set_rx_subdev_spec(uhd.usrp.SubdevSpec(args.spec))
+ rx_streamer = get_rx_streamer(usrp, args.channel)
+ total_num_samps = int(args.duration * args.rate)
+ skip_samps = int(args.skip_time * args.rate)
+ print("Total number of samples to acquire: {}".format(total_num_samps))
+ apply_initial_settings(
+ usrp,
+ args.channel,
+ args.rate,
+ args.freq,
+ args.gain
+ )
+ time_zero = usrp.get_time_now()
+ print("Sending stream commands...")
+ start_rx_stream(
+ rx_streamer,
+ time_zero+args.setup_delay,
+ )
+ print("Preloading set commands...")
+ load_commands(
+ usrp=usrp,
+ chan=args.channel,
+ cmd_time=time_zero+args.setup_delay+args.set_delay,
+ freq=args.new_freq,
+ gain=args.new_gain,
+ )
+ print("Starting receive...")
+ samps = recv_samples(rx_streamer, total_num_samps, skip_samps)
+ if samps is None:
+ return False
+ print("Received {} samples.".format(samps.size))
+ print("New settings are applied at sample index {}."
+ .format(int((args.set_delay - args.skip_time) * args.rate)))
+ if args.plot:
+ plot_samps(
+ samps,
+ args.rate,
+ args.set_delay - args.skip_time,
+ )
+ if args.output_file:
+ save_to_file(
+ samps,
+ args.output_file,
+ args.numpy,
+ )
+ return True
+
+if __name__ == "__main__":
+ exit(not main())
+