diff options
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/gr-usrptest/apps/uhd_phase_alignment.py | 545 |
1 files changed, 545 insertions, 0 deletions
diff --git a/tools/gr-usrptest/apps/uhd_phase_alignment.py b/tools/gr-usrptest/apps/uhd_phase_alignment.py new file mode 100755 index 000000000..ff36c4e7c --- /dev/null +++ b/tools/gr-usrptest/apps/uhd_phase_alignment.py @@ -0,0 +1,545 @@ +#!/usr/bin/env python +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +UHD Phase Alignment: Phase alignment test using the UHD Python API. +""" + + +import argparse +from builtins import input +from datetime import datetime, timedelta +import itertools as itt +import sys +import time +import logging +import numpy as np +import numpy.random as npr +import uhd + + +CLOCK_TIMEOUT = 1000 # 1000mS timeout for external clock locking +INIT_DELAY = 0.05 # 50mS initial delay before transmit +CMD_DELAY = 0.05 # set a 50mS delay in commands +NUM_RETRIES = 10 # Number of retries on a given trial before giving up +# TODO: Add support for TX phase alignment + + +def parse_args(): + """Parse the command line arguments""" + description = """UHD Phase Alignment (Python API) + + Currently only supports RX phase alignment + + Example usage: + - Setup: 2x X310's (one with dboard in slot A, one in slot B) + + uhd_phase_alignment.py --args addr0=ADDR0,addr1=ADDR1 --rate 5e6 --gain 30 + --start-freq 1e9 --stop-freq 2e9 --freq-bands 3 + --clock-source external --time-source external --sync pps + --subdev "A:0" "A:0" --runs 3 --duration 1.0 + + Note: when specifying --subdev, put each mboard's subdev in "" + """ + # TODO: Add gain steps! + parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter, + description=description) + # Standard device args + parser.add_argument("--args", default="", type=str, + help="UHD device address args (requires 2 MBoards)") + parser.add_argument("--rate", type=float, default=5e6, + help="specify to perform a rate test (sps)") + parser.add_argument("--gain", type=float, default=10., + help="specify a gain setting for the device") + parser.add_argument("--channels", default=[0, 1], nargs="+", type=int, + help="which channel(s) to use " + "(specify 0 1 or 0 1 2 3)") + parser.add_argument("--duration", default=0.25, type=float, + help="duration for each capture in seconds") + parser.add_argument("--runs", default=10, type=int, + help="Number of times to retune and measure phase alignment") + # Test configuration + parser.add_argument("--start-freq", type=float, required=True, + help="specify a minimum frequency") + parser.add_argument("--stop-freq", type=float, required=True, + help="specify a maximum frequency") + parser.add_argument("--freq-bands", type=float, required=True, + help="specify the number of frequency bands to test") + parser.add_argument("--start-power", type=float, default=-30, + help="specify a starting output power for the siggen (dBm)") + parser.add_argument("--power-step", type=float, default=0, + help="specify the increase in siggen output power at each step") + parser.add_argument("--tone-offset", type=float, default=1e6, + help="Frequency offset of the input signal (ie. the " + "difference between the device's center frequency " + "and the test tone)") + parser.add_argument("--drift-threshold", type=float, default=2., + help="Maximum frequency drift (deg) while testing a given frequency") + parser.add_argument("--stddev-threshold", type=float, default=2., + help="Maximum frequency deviation (deg) over a single receive call") + # Device configuration + parser.add_argument("--clock-source", type=str, + help="clock reference (internal, external, mimo, gpsdo)") + parser.add_argument("--time-source", type=str, + help="PPS source (internal, external, mimo, gpsdo)") + parser.add_argument("--sync", type=str, default="default", + #choices=["default", "pps", "mimo"], + help="Method to synchronize devices)") + parser.add_argument("--subdev", type=str, nargs="+", + help="Subdevice(s) of UHD device where appropriate. Use " + "a space-separated list to set different boards to " + "different specs.") + # Extra, advanced arguments + parser.add_argument("--plot", default=False, action="store_true", + help="Plot results") + parser.add_argument("--save", default=False, action="store_true", + help="Save each set of samples") + parser.add_argument("--easy-tune", type=bool, default=True, + help="Round the target frequency to the nearest MHz") + args = parser.parse_args() + + # Do some sanity checking + if args.tone_offset >= (args.rate / 2): + logger.warning("Tone offset may be outside the received bandwidth!") + + return args + + +class LogFormatter(logging.Formatter): + """Log formatter which prints the timestamp with fractional seconds""" + @staticmethod + def pp_now(): + """Returns a formatted string containing the time of day""" + now = datetime.now() + return "{:%H:%M}:{:05.2f}".format(now, now.second + now.microsecond / 1e6) + + def formatTime(self, record, datefmt=None): + converter = self.converter(record.created) + if datefmt: + formatted_date = converter.strftime(datefmt) + else: + formatted_date = LogFormatter.pp_now() + return formatted_date + + +def setup_ref(usrp, ref, num_mboards): + """Setup the reference clock""" + if ref == "mimo": + if num_mboards != 2: + logger.error("ref = \"mimo\" implies 2 motherboards; " + "your system has %d boards", num_mboards) + return False + usrp.set_clock_source("mimo", 1) + else: + usrp.set_clock_source(ref) + + # Lock onto clock signals for all mboards + if ref != "internal": + logger.debug("Now confirming lock on clock signals...") + end_time = datetime.now() + timedelta(milliseconds=CLOCK_TIMEOUT) + for i in range(num_mboards): + if ref == "mimo" and i == 0: + continue + is_locked = usrp.get_mboard_sensor("ref_locked", i) + while (not is_locked) and (datetime.now() < end_time): + time.sleep(1e-3) + is_locked = usrp.get_mboard_sensor("ref_locked", i) + if not is_locked: + logger.error("Unable to confirm clock signal locked on board %d", i) + return False + return True + + +def setup_pps(usrp, pps, num_mboards): + """Setup the PPS source""" + if pps == "mimo": + if num_mboards != 2: + logger.error("ref = \"mimo\" implies 2 motherboards; " + "your system has %d boards", num_mboards) + return False + # make mboard 1 a slave over the MIMO Cable + usrp.set_time_source("mimo", 1) + else: + usrp.set_time_source(pps) + return True + + +def setup_usrp(args): + """Create, configure, and return the device + + The USRP object that is returned will be synchronized and ready to receive. + """ + usrp = uhd.usrp.MultiUSRP(args.args) + + # Always select the subdevice first, the channel mapping affects the other settings + if args.subdev: + assert len(args.subdev) == usrp.get_num_mboards(),\ + "Please specify a subdevice spec for each mboard" + for mb_idx in range(usrp.get_num_mboards()): + usrp.set_rx_subdev_spec(uhd.usrp.SubdevSpec(args.subdev[mb_idx]), mb_idx) + + else: + logger.warning("No RX subdev specs set! Please ensure that the correct " + "connections are being used.") + + logger.info("Using Device: %s", usrp.get_pp_string()) + + # Set the reference clock + if args.clock_source and not setup_ref(usrp, args.clock_source, usrp.get_num_mboards()): + # If we wanted to set a reference clock and it failed, return + return None + + # Set the PPS source + if args.time_source and not setup_pps(usrp, args.time_source, usrp.get_num_mboards()): + # If we wanted to set a PPS source and it failed, return + return None + # At this point, we can assume our device has valid and locked clock and PPS + + # Determine channel settings + # TODO: Add support for >2 channels! (TwinRX) + if len(args.channels) != 2: + logger.error("Must select 2 channels! (%s selected)", args.channels) + return None + logger.info("Selected %s RX channels", args.channels if args.channels else "no") + # Set the sample rate + for chan in args.channels: + usrp.set_rx_rate(args.rate, chan) + + # Actually synchronize devices + # We already know we have >=2 channels, so don't worry about that + if args.sync in ['default', "pps"]: + logger.info("Setting device timestamp to 0...") + usrp.set_time_unknown_pps(uhd.types.TimeSpec(0.0)) + elif args.sync == 'mimo': + # For MIMO, we want to set the time on the master and let it propogate + # through the MIMO cable + usrp.set_time_now(uhd.types.TimeSpec(0.0), 0) + time.sleep(1) + logger.info("Current device timestamp: %.8f", + usrp.get_time_now().get_real_secs()) + else: + # This should never happen- argparse choices should handle this + logger.error("Invalid sync option for given configuration: %s", args.sync) + return None + + return usrp + + +def get_band_limits(start_freq, stop_freq, freq_bands): + """Return an array of length `freq_bands + 1`. + Each element marks the start of a frequency band (Hz). + Bands are equal sized (not log or anything fancy). + The last element is the stop frequency. + ex. get_band_limits(10., 100., 2) => [10., 55., 100.] + """ + return np.linspace(start_freq, stop_freq, freq_bands+1, endpoint=True) + + +def window(seq, width=2): + """Returns a sliding window (of `width` elements) over data from the iterable. + s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... + Itertools example found at https://docs.python.org/release/2.3.5/lib/itertools-example.html + """ + seq_iter = iter(seq) + result = tuple(itt.islice(seq_iter, width)) + if len(result) == width: + yield result + for elem in seq_iter: + result = result[1:] + (elem,) + yield result + + +def generate_time_spec(usrp, time_delta=0.05): + """Return a TimeSpec for now + `time_delta`""" + return usrp.get_time_now() + uhd.types.TimeSpec(time_delta) + + +def tune_siggen(freq, power_lvl): + """Tune the signal generator to output the correct tone""" + # TODO: support actual RTS equipment, or any automated way + input("Please tune the signal generator to {:.3f} MHz and {:.1f} dBm, " + "then press Enter".format(freq / 1e6, power_lvl)) + + +def tune_usrp(usrp, freq, channels, delay=CMD_DELAY): + """Synchronously set the device's frequency""" + usrp.set_command_time(generate_time_spec(usrp, time_delta=delay)) + for chan in channels: + usrp.set_rx_freq(uhd.types.TuneRequest(freq), chan) + + +def recv_aligned_num_samps(usrp, streamer, num_samps, freq, channels=(0,)): + """ + RX a finite number of samples from the USRP + :param usrp: MultiUSRP object + :param streamer: RX streamer object + :param num_samps: number of samples to RX + :param freq: RX frequency (Hz) + :param channels: list of channels to RX on + :return: numpy array of complex floating-point samples (fc32) + """ + # Allocate a sample buffer + result = np.empty((len(channels), num_samps), dtype=np.complex64) + + # Tune to the desired frequency + tune_usrp(usrp, freq, channels) + + metadata = uhd.types.RXMetadata() + buffer_samps = streamer.get_max_num_samps() * 10 + recv_buffer = np.zeros( + (len(channels), buffer_samps), dtype=np.complex64) + recv_samps = 0 + + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) + stream_cmd.stream_now = False + stream_cmd.time_spec = generate_time_spec(usrp) + stream_cmd.num_samps = num_samps + streamer.issue_stream_cmd(stream_cmd) + logger.debug("Sending stream command for T=%.2f", stream_cmd.time_spec.get_real_secs()) + + samps = np.array([], dtype=np.complex64) + while recv_samps < num_samps: + samps = streamer.recv(recv_buffer, metadata) + + if metadata.error_code != uhd.types.RXMetadataErrorCode.none: + # If we get a timeout, retry MAX_TIMEOUTS times + if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: + logger.error("%s (%d samps recv'd)", metadata.strerror(), recv_samps) + recv_samps = 0 + break + + real_samps = min(num_samps - recv_samps, samps) + result[:, recv_samps:recv_samps + real_samps] = recv_buffer[:, 0:real_samps] + recv_samps += real_samps + + logger.debug("Stopping stream") + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) + streamer.issue_stream_cmd(stream_cmd) + + logger.debug("Flushing stream") + # Flush the remainder of the samples + while samps: + samps = streamer.recv(recv_buffer, metadata) + + if recv_samps < num_samps: + logger.warning("Received too few samples, returning an empty array") + return np.array([], dtype=np.complex64) + return result + + +def plot_samps(samps, alignment): + """ + Show a nice plot of samples and their phase alignment + """ + try: + import pylab as plt + except ImportError: + logger.error("--plot requires pylab.") + return + + plt.tick_params(axis="both", labelsize=20) + # Plot the samples + plt.plot(samps[0][1000:2000].real, 'b') + plt.plot(samps[1][1000:2000].real, 'r') + plt.title("Phase Aligned RX", fontsize=44) + plt.legend(["Device A", "Device B"], fontsize=24) + plt.ylabel("Amplitude (real)", fontsize=35) + plt.xlabel("Time (us)", fontsize=35) + plt.show() + # Plot the alignment + logger.info("plotting alignment") + plt.plot(alignment) + plt.title("Phase Difference between Devices", fontsize=40) + plt.ylabel("Phase Delta (radian)", fontsize=30) + plt.xlabel("Time (us)", fontsize=30) + plt.ylim([-np.pi, np.pi]) + plt.show() + + +def check_results(alignment_stats, drift_thresh, stddev_thresh): + """Print the alignment stats in a nice way + + alignment_stats should be a dictionary of the following form: + {test_freq : [list of runs], ...} + ... the list of runs takes the form: + [{dictionary of run statistics}, ...] + ... the run dictionary has the following keys: + mean, stddev, min, max, test_freq, run_freq + ... whose values are all floats + """ + success = True # Whether or not we've exceeded a threshold + msg = "" + for freq, stats_list in alignment_stats.items(): + # Try to grab the test frequency for the frequency band + try: + test_freq = stats_list[0].get("test_freq") + except (KeyError, IndexError): + test_freq = 0. + logger.error("Failed to find test frequency for test band %.2fMHz", freq) + msg += "=== Frequency band starting at {:.2f}MHz. ===\n".format(freq/1e6) + msg += "Test Frequency: {:.2f}MHz ===\n".format(test_freq/1e6) + + # Allocate a list so we can calulate the drift over a set of runs + mean_list = [] + + for run_dict in stats_list: + run_freq = run_dict.get("run_freq", 0.) + # Convert mean and stddev to degrees + mean_deg = run_dict.get("mean", 0.) * 180 / np.pi + stddev_deg = run_dict.get("stddev", 0.) * 180 / np.pi + if stddev_deg > stddev_thresh: + success = False + + msg += "{:.2f}MHz<-{:.2f}MHz: {:.3f} deg +- {:.3f}\n".format( + test_freq/1e6, run_freq/1e6, mean_deg, stddev_deg + ) + mean_list.append(mean_deg) + + # Report the largest difference in mean values of runs + # FIXME: This won't work around +-180 deg + max_drift = max(mean_list) - min(mean_list) + if max_drift > drift_thresh: + success = False + msg += "--Maximum drift over runs: {:.2f} degrees\n".format(max_drift) + # Print a newline to separate frequency bands + msg += "\n" + + logger.info("Printing statistics!\n%s", msg) + return success + + +def main(): + """RX samples and write to file""" + args = parse_args() + + # Setup a usrp device + usrp = setup_usrp(args) + if usrp is None: + return False + + ### General test description ### + # 1. Split the frequency range of our device into bands. For each of these + # bands, we'll pick a random frequency within the band to be our test + # frequency. + # 2. Again split the frequency range of our device into bands, this time + # using the number of trials we want to run to split the range. Pick a + # random frequency within each run band. Tune to that run frequency, then + # back to our test frequency. + # 3. Receive synchronized samples, and determine the phase alignment. Report + # statistics based on the alignment. + # 4. Once we've iterated through each test frequency, determine whether or + # not the test passed or failed. + + # Determine the frequency bands we need to test + # TODO: allow users to specify test frequencies in args + freq_bands = get_band_limits(args.start_freq, args.stop_freq, args.freq_bands) + # Frequency bands to tune away to + # TODO: make this based on the device's frequency range. This requires + # additional Python API bindings. + run_bands = get_band_limits(args.start_freq, args.stop_freq, args.runs) + + nsamps = int(args.duration * args.rate) + st_args = uhd.usrp.StreamArgs("fc32", "sc16") + st_args.channels = args.channels + streamer = usrp.get_rx_stream(st_args) + + # Make a big dictionary to store all of the reported statistics + # Keys are the starting test frequency of the band + # Values are lists of dictionaries of statistics + all_alignment_stats = {} + # Test phase alignment in each test frequency band + current_power = args.start_power + for freq_start, freq_stop in window(freq_bands): + # Pick a random center frequency between the start and stop frequencies + tune_freq = npr.uniform(freq_start, freq_stop) + if args.easy_tune: + # Round to the nearest MHz + tune_freq = np.round(tune_freq, -6) + # Request the SigGen tune to our test frequency plus some offset away + # the device's LO + tune_siggen(tune_freq + args.tone_offset, current_power) + + # This is where the magic happens! + # Store phase alignment statistics as a list of dictionaries + alignment_stats = [] + for tune_away_start, tune_away_stop in window(run_bands): + # Try to get samples + for i in range(NUM_RETRIES): + # Tune to a random frequency in each of the frequency bands... + tune_away_freq = npr.uniform(tune_away_start, tune_away_stop) + tune_usrp(usrp, tune_away_freq, args.channels) + time.sleep(0.5) + + logger.info("Receiving samples, take %d, (%.2fMHz -> %.2fMHz)", + i, tune_away_freq/1e6, tune_freq/1e6) + + # Then tune back to our desired test frequency, and receive samples + samps = recv_aligned_num_samps(usrp, + streamer, + nsamps, + tune_freq, + args.channels) + if samps.size >= nsamps: + break + else: + streamer = None # Help the garbage collector + time.sleep(1) + streamer = usrp.get_rx_stream(st_args) + + # If we have failed to get good samples, put an empty dict in the stats + else: + logger.error("Failed to receive aligned samples!") + alignment_stats.append({}) + continue + + alignment = np.angle(np.conj(samps[0]) * samps[1])[500:] + + if args.plot: + plot_samps(samps, alignment,) + + if args.save: + # TODO: add frequency data + date_now = datetime.utcnow() + epoch = datetime(1970, 1, 1) + utc_now = int((date_now - epoch).total_seconds()) + np.savez("phaseAligned_{}.npz".format(utc_now), samps) + + # Store the phase alignment stats + alignment_stats.append({ + "mean": np.mean(alignment), + # Subtract the mean before calculating the stddev so we don't + # have rollover errors + "stddev": np.std(alignment - np.mean(alignment)), + "min": alignment.min(), + "max": alignment.max(), + "test_freq": tune_freq, + "run_freq": tune_away_freq + }) + run_means = [run_stats.get("mean", 0.) for run_stats in alignment_stats] + run_stddevs = [run_stats.get("stddev", 0.) for run_stats in alignment_stats] + logger.debug("Test freq %.3fMHz health check: %.1f deg drift, %.2f deg max stddev", + tune_freq/1e6, + max(run_means) - min(run_means), # FIXME: This won't work around +-180 deg + max(run_stddevs) + ) + all_alignment_stats[freq_start] = alignment_stats + # Increment the power level for the next run + current_power += args.power_step + + return check_results(all_alignment_stats, args.drift_threshold, args.stddev_threshold) + + +if __name__ == "__main__": + # Setup the logger with our custom timestamp formatting + global logger + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) + console = logging.StreamHandler() + logger.addHandler(console) + formatter = LogFormatter(fmt='[%(asctime)s] [%(levelname)s] %(message)s') + console.setFormatter(formatter) + + sys.exit(not main()) |