diff options
| -rwxr-xr-x | tools/gr-usrptest/apps/uhd_phase_alignment.py | 545 | 
1 files changed, 545 insertions, 0 deletions
diff --git a/tools/gr-usrptest/apps/uhd_phase_alignment.py b/tools/gr-usrptest/apps/uhd_phase_alignment.py new file mode 100755 index 000000000..ff36c4e7c --- /dev/null +++ b/tools/gr-usrptest/apps/uhd_phase_alignment.py @@ -0,0 +1,545 @@ +#!/usr/bin/env python +# +# Copyright 2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +UHD Phase Alignment: Phase alignment test using the UHD Python API. +""" + + +import argparse +from builtins import input +from datetime import datetime, timedelta +import itertools as itt +import sys +import time +import logging +import numpy as np +import numpy.random as npr +import uhd + + +CLOCK_TIMEOUT = 1000  # 1000mS timeout for external clock locking +INIT_DELAY = 0.05  # 50mS initial delay before transmit +CMD_DELAY = 0.05  # set a 50mS delay in commands +NUM_RETRIES = 10  # Number of retries on a given trial before giving up +# TODO: Add support for TX phase alignment + + +def parse_args(): +    """Parse the command line arguments""" +    description = """UHD Phase Alignment (Python API) + +    Currently only supports RX phase alignment + +    Example usage: +    - Setup: 2x X310's (one with dboard in slot A, one in slot B) + +    uhd_phase_alignment.py --args addr0=ADDR0,addr1=ADDR1 --rate 5e6 --gain 30 +                           --start-freq 1e9 --stop-freq 2e9 --freq-bands 3 +                           --clock-source external --time-source external --sync pps +                           --subdev "A:0" "A:0" --runs 3 --duration 1.0 + +    Note: when specifying --subdev, put each mboard's subdev in "" +    """ +    # TODO: Add gain steps! +    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter, +                                     description=description) +    # Standard device args +    parser.add_argument("--args", default="", type=str, +                        help="UHD device address args (requires 2 MBoards)") +    parser.add_argument("--rate", type=float, default=5e6, +                        help="specify to perform a rate test (sps)") +    parser.add_argument("--gain", type=float, default=10., +                        help="specify a gain setting for the device") +    parser.add_argument("--channels", default=[0, 1], nargs="+", type=int, +                        help="which channel(s) to use " +                             "(specify 0 1 or 0 1 2 3)") +    parser.add_argument("--duration", default=0.25, type=float, +                        help="duration for each capture in seconds") +    parser.add_argument("--runs", default=10, type=int, +                        help="Number of times to retune and measure phase alignment") +    # Test configuration +    parser.add_argument("--start-freq", type=float, required=True, +                        help="specify a minimum frequency") +    parser.add_argument("--stop-freq", type=float, required=True, +                        help="specify a maximum frequency") +    parser.add_argument("--freq-bands", type=float, required=True, +                        help="specify the number of frequency bands to test") +    parser.add_argument("--start-power", type=float, default=-30, +                        help="specify a starting output power for the siggen (dBm)") +    parser.add_argument("--power-step", type=float, default=0, +                        help="specify the increase in siggen output power at each step") +    parser.add_argument("--tone-offset", type=float, default=1e6, +                        help="Frequency offset of the input signal (ie. the " +                             "difference between the device's center frequency " +                             "and the test tone)") +    parser.add_argument("--drift-threshold", type=float, default=2., +                        help="Maximum frequency drift (deg) while testing a given frequency") +    parser.add_argument("--stddev-threshold", type=float, default=2., +                        help="Maximum frequency deviation (deg) over a single receive call") +    # Device configuration +    parser.add_argument("--clock-source", type=str, +                        help="clock reference (internal, external, mimo, gpsdo)") +    parser.add_argument("--time-source", type=str, +                        help="PPS source (internal, external, mimo, gpsdo)") +    parser.add_argument("--sync", type=str, default="default", +                        #choices=["default", "pps", "mimo"], +                        help="Method to synchronize devices)") +    parser.add_argument("--subdev", type=str, nargs="+", +                        help="Subdevice(s) of UHD device where appropriate. Use " +                             "a space-separated list to set different boards to " +                             "different specs.") +    # Extra, advanced arguments +    parser.add_argument("--plot", default=False, action="store_true", +                        help="Plot results") +    parser.add_argument("--save", default=False, action="store_true", +                        help="Save each set of samples") +    parser.add_argument("--easy-tune", type=bool, default=True, +                        help="Round the target frequency to the nearest MHz") +    args = parser.parse_args() + +    # Do some sanity checking +    if args.tone_offset >= (args.rate / 2): +        logger.warning("Tone offset may be outside the received bandwidth!") + +    return args + + +class LogFormatter(logging.Formatter): +    """Log formatter which prints the timestamp with fractional seconds""" +    @staticmethod +    def pp_now(): +        """Returns a formatted string containing the time of day""" +        now = datetime.now() +        return "{:%H:%M}:{:05.2f}".format(now, now.second + now.microsecond / 1e6) + +    def formatTime(self, record, datefmt=None): +        converter = self.converter(record.created) +        if datefmt: +            formatted_date = converter.strftime(datefmt) +        else: +            formatted_date = LogFormatter.pp_now() +        return formatted_date + + +def setup_ref(usrp, ref, num_mboards): +    """Setup the reference clock""" +    if ref == "mimo": +        if num_mboards != 2: +            logger.error("ref = \"mimo\" implies 2 motherboards; " +                         "your system has %d boards", num_mboards) +            return False +        usrp.set_clock_source("mimo", 1) +    else: +        usrp.set_clock_source(ref) + +    # Lock onto clock signals for all mboards +    if ref != "internal": +        logger.debug("Now confirming lock on clock signals...") +        end_time = datetime.now() + timedelta(milliseconds=CLOCK_TIMEOUT) +        for i in range(num_mboards): +            if ref == "mimo" and i == 0: +                continue +            is_locked = usrp.get_mboard_sensor("ref_locked", i) +            while (not is_locked) and (datetime.now() < end_time): +                time.sleep(1e-3) +                is_locked = usrp.get_mboard_sensor("ref_locked", i) +            if not is_locked: +                logger.error("Unable to confirm clock signal locked on board %d", i) +                return False +    return True + + +def setup_pps(usrp, pps, num_mboards): +    """Setup the PPS source""" +    if pps == "mimo": +        if num_mboards != 2: +            logger.error("ref = \"mimo\" implies 2 motherboards; " +                         "your system has %d boards", num_mboards) +            return False +        # make mboard 1 a slave over the MIMO Cable +        usrp.set_time_source("mimo", 1) +    else: +        usrp.set_time_source(pps) +    return True + + +def setup_usrp(args): +    """Create, configure, and return the device + +    The USRP object that is returned will be synchronized and ready to receive. +    """ +    usrp = uhd.usrp.MultiUSRP(args.args) + +    # Always select the subdevice first, the channel mapping affects the other settings +    if args.subdev: +        assert len(args.subdev) == usrp.get_num_mboards(),\ +            "Please specify a subdevice spec for each mboard" +        for mb_idx in range(usrp.get_num_mboards()): +            usrp.set_rx_subdev_spec(uhd.usrp.SubdevSpec(args.subdev[mb_idx]), mb_idx) + +    else: +        logger.warning("No RX subdev specs set! Please ensure that the correct " +                       "connections are being used.") + +    logger.info("Using Device: %s", usrp.get_pp_string()) + +    # Set the reference clock +    if args.clock_source and not setup_ref(usrp, args.clock_source, usrp.get_num_mboards()): +        # If we wanted to set a reference clock and it failed, return +        return None + +    # Set the PPS source +    if args.time_source and not setup_pps(usrp, args.time_source, usrp.get_num_mboards()): +        # If we wanted to set a PPS source and it failed, return +        return None +    # At this point, we can assume our device has valid and locked clock and PPS + +    # Determine channel settings +    # TODO: Add support for >2 channels! (TwinRX) +    if len(args.channels) != 2: +        logger.error("Must select 2 channels! (%s selected)", args.channels) +        return None +    logger.info("Selected %s RX channels", args.channels if args.channels else "no") +    # Set the sample rate +    for chan in args.channels: +        usrp.set_rx_rate(args.rate, chan) + +    # Actually synchronize devices +    # We already know we have >=2 channels, so don't worry about that +    if args.sync in ['default', "pps"]: +        logger.info("Setting device timestamp to 0...") +        usrp.set_time_unknown_pps(uhd.types.TimeSpec(0.0)) +    elif args.sync == 'mimo': +        # For MIMO, we want to set the time on the master and let it propogate +        # through the MIMO cable +        usrp.set_time_now(uhd.types.TimeSpec(0.0), 0) +        time.sleep(1) +        logger.info("Current device timestamp: %.8f", +                    usrp.get_time_now().get_real_secs()) +    else: +        # This should never happen- argparse choices should handle this +        logger.error("Invalid sync option for given configuration: %s", args.sync) +        return None + +    return usrp + + +def get_band_limits(start_freq, stop_freq, freq_bands): +    """Return an array of length `freq_bands + 1`. +    Each element marks the start of a frequency band (Hz). +    Bands are equal sized (not log or anything fancy). +    The last element is the stop frequency. +    ex. get_band_limits(10., 100., 2) => [10., 55., 100.] +    """ +    return np.linspace(start_freq, stop_freq, freq_bands+1, endpoint=True) + + +def window(seq, width=2): +    """Returns a sliding window (of `width` elements) over data from the iterable. +    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... +    Itertools example found at https://docs.python.org/release/2.3.5/lib/itertools-example.html +    """ +    seq_iter = iter(seq) +    result = tuple(itt.islice(seq_iter, width)) +    if len(result) == width: +        yield result +    for elem in seq_iter: +        result = result[1:] + (elem,) +        yield result + + +def generate_time_spec(usrp, time_delta=0.05): +    """Return a TimeSpec for now + `time_delta`""" +    return usrp.get_time_now() + uhd.types.TimeSpec(time_delta) + + +def tune_siggen(freq, power_lvl): +    """Tune the signal generator to output the correct tone""" +    # TODO: support actual RTS equipment, or any automated way +    input("Please tune the signal generator to {:.3f} MHz and {:.1f} dBm, " +          "then press Enter".format(freq / 1e6, power_lvl)) + + +def tune_usrp(usrp, freq, channels, delay=CMD_DELAY): +    """Synchronously set the device's frequency""" +    usrp.set_command_time(generate_time_spec(usrp, time_delta=delay)) +    for chan in channels: +        usrp.set_rx_freq(uhd.types.TuneRequest(freq), chan) + + +def recv_aligned_num_samps(usrp, streamer, num_samps, freq, channels=(0,)): +    """ +    RX a finite number of samples from the USRP +    :param usrp: MultiUSRP object +    :param streamer: RX streamer object +    :param num_samps: number of samples to RX +    :param freq: RX frequency (Hz) +    :param channels: list of channels to RX on +    :return: numpy array of complex floating-point samples (fc32) +    """ +    # Allocate a sample buffer +    result = np.empty((len(channels), num_samps), dtype=np.complex64) + +    # Tune to the desired frequency +    tune_usrp(usrp, freq, channels) + +    metadata = uhd.types.RXMetadata() +    buffer_samps = streamer.get_max_num_samps() * 10 +    recv_buffer = np.zeros( +        (len(channels), buffer_samps), dtype=np.complex64) +    recv_samps = 0 + +    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) +    stream_cmd.stream_now = False +    stream_cmd.time_spec = generate_time_spec(usrp) +    stream_cmd.num_samps = num_samps +    streamer.issue_stream_cmd(stream_cmd) +    logger.debug("Sending stream command for T=%.2f", stream_cmd.time_spec.get_real_secs()) + +    samps = np.array([], dtype=np.complex64) +    while recv_samps < num_samps: +        samps = streamer.recv(recv_buffer, metadata) + +        if metadata.error_code != uhd.types.RXMetadataErrorCode.none: +            # If we get a timeout, retry MAX_TIMEOUTS times +            if metadata.error_code == uhd.types.RXMetadataErrorCode.timeout: +                logger.error("%s (%d samps recv'd)", metadata.strerror(), recv_samps) +                recv_samps = 0 +                break + +        real_samps = min(num_samps - recv_samps, samps) +        result[:, recv_samps:recv_samps + real_samps] = recv_buffer[:, 0:real_samps] +        recv_samps += real_samps + +    logger.debug("Stopping stream") +    stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) +    streamer.issue_stream_cmd(stream_cmd) + +    logger.debug("Flushing stream") +    # Flush the remainder of the samples +    while samps: +        samps = streamer.recv(recv_buffer, metadata) + +    if recv_samps < num_samps: +        logger.warning("Received too few samples, returning an empty array") +        return np.array([], dtype=np.complex64) +    return result + + +def plot_samps(samps, alignment): +    """ +    Show a nice plot of samples and their phase alignment +    """ +    try: +        import pylab as plt +    except ImportError: +        logger.error("--plot requires pylab.") +        return + +    plt.tick_params(axis="both", labelsize=20) +    # Plot the samples +    plt.plot(samps[0][1000:2000].real, 'b') +    plt.plot(samps[1][1000:2000].real, 'r') +    plt.title("Phase Aligned RX", fontsize=44) +    plt.legend(["Device A", "Device B"], fontsize=24) +    plt.ylabel("Amplitude (real)", fontsize=35) +    plt.xlabel("Time (us)", fontsize=35) +    plt.show() +    # Plot the alignment +    logger.info("plotting alignment") +    plt.plot(alignment) +    plt.title("Phase Difference between Devices", fontsize=40) +    plt.ylabel("Phase Delta (radian)", fontsize=30) +    plt.xlabel("Time (us)", fontsize=30) +    plt.ylim([-np.pi, np.pi]) +    plt.show() + + +def check_results(alignment_stats, drift_thresh, stddev_thresh): +    """Print the alignment stats in a nice way + +    alignment_stats should be a dictionary of the following form: +    {test_freq : [list of runs], ...} +    ... the list of runs takes the form: +    [{dictionary of run statistics}, ...] +    ... the run dictionary has the following keys: +    mean, stddev, min, max, test_freq, run_freq +    ... whose values are all floats +    """ +    success = True  # Whether or not we've exceeded a threshold +    msg = "" +    for freq, stats_list in alignment_stats.items(): +        # Try to grab the test frequency for the frequency band +        try: +            test_freq = stats_list[0].get("test_freq") +        except (KeyError, IndexError): +            test_freq = 0. +            logger.error("Failed to find test frequency for test band %.2fMHz", freq) +        msg += "=== Frequency band starting at {:.2f}MHz. ===\n".format(freq/1e6) +        msg += "Test Frequency: {:.2f}MHz ===\n".format(test_freq/1e6) + +        # Allocate a list so we can calulate the drift over a set of runs +        mean_list = [] + +        for run_dict in stats_list: +            run_freq = run_dict.get("run_freq", 0.) +            # Convert mean and stddev to degrees +            mean_deg = run_dict.get("mean", 0.) * 180 / np.pi +            stddev_deg = run_dict.get("stddev", 0.) * 180 / np.pi +            if stddev_deg > stddev_thresh: +                success = False + +            msg += "{:.2f}MHz<-{:.2f}MHz: {:.3f} deg +- {:.3f}\n".format( +                test_freq/1e6, run_freq/1e6, mean_deg, stddev_deg +            ) +            mean_list.append(mean_deg) + +        # Report the largest difference in mean values of runs +        # FIXME: This won't work around +-180 deg +        max_drift = max(mean_list) - min(mean_list) +        if max_drift > drift_thresh: +            success = False +        msg += "--Maximum drift over runs: {:.2f} degrees\n".format(max_drift) +        # Print a newline to separate frequency bands +        msg += "\n" + +    logger.info("Printing statistics!\n%s", msg) +    return success + + +def main(): +    """RX samples and write to file""" +    args = parse_args() + +    # Setup a usrp device +    usrp = setup_usrp(args) +    if usrp is None: +        return False + +    ### General test description ### +    # 1. Split the frequency range of our device into bands. For each of these +    #    bands, we'll pick a random frequency within the band to be our test +    #    frequency. +    # 2. Again split the frequency range of our device into bands, this time +    #    using the number of trials we want to run to split the range. Pick a +    #    random frequency within each run band. Tune to that run frequency, then +    #    back to our test frequency. +    # 3. Receive synchronized samples, and determine the phase alignment. Report +    #    statistics based on the alignment. +    # 4. Once we've iterated through each test frequency, determine whether or +    #    not the test passed or failed. + +    # Determine the frequency bands we need to test +    # TODO: allow users to specify test frequencies in args +    freq_bands = get_band_limits(args.start_freq, args.stop_freq, args.freq_bands) +    # Frequency bands to tune away to +    # TODO: make this based on the device's frequency range. This requires +    #       additional Python API bindings. +    run_bands = get_band_limits(args.start_freq, args.stop_freq, args.runs) + +    nsamps = int(args.duration * args.rate) +    st_args = uhd.usrp.StreamArgs("fc32", "sc16") +    st_args.channels = args.channels +    streamer = usrp.get_rx_stream(st_args) + +    # Make a big dictionary to store all of the reported statistics +    # Keys are the starting test frequency of the band +    # Values are lists of dictionaries of statistics +    all_alignment_stats = {} +    # Test phase alignment in each test frequency band +    current_power = args.start_power +    for freq_start, freq_stop in window(freq_bands): +        # Pick a random center frequency between the start and stop frequencies +        tune_freq = npr.uniform(freq_start, freq_stop) +        if args.easy_tune: +            # Round to the nearest MHz +            tune_freq = np.round(tune_freq, -6) +        # Request the SigGen tune to our test frequency plus some offset away +        # the device's LO +        tune_siggen(tune_freq + args.tone_offset, current_power) + +        # This is where the magic happens! +        # Store phase alignment statistics as a list of dictionaries +        alignment_stats = [] +        for tune_away_start, tune_away_stop in window(run_bands): +            # Try to get samples +            for i in range(NUM_RETRIES): +                # Tune to a random frequency in each of the frequency bands... +                tune_away_freq = npr.uniform(tune_away_start, tune_away_stop) +                tune_usrp(usrp, tune_away_freq, args.channels) +                time.sleep(0.5) + +                logger.info("Receiving samples, take %d, (%.2fMHz -> %.2fMHz)", +                            i, tune_away_freq/1e6, tune_freq/1e6) + +                # Then tune back to our desired test frequency, and receive samples +                samps = recv_aligned_num_samps(usrp, +                                               streamer, +                                               nsamps, +                                               tune_freq, +                                               args.channels) +                if samps.size >= nsamps: +                    break +                else: +                    streamer = None # Help the garbage collector +                    time.sleep(1) +                    streamer = usrp.get_rx_stream(st_args) + +            # If we have failed to get good samples, put an empty dict in the stats +            else: +                logger.error("Failed to receive aligned samples!") +                alignment_stats.append({}) +                continue + +            alignment = np.angle(np.conj(samps[0]) * samps[1])[500:] + +            if args.plot: +                plot_samps(samps, alignment,) + +            if args.save: +                # TODO: add frequency data +                date_now = datetime.utcnow() +                epoch = datetime(1970, 1, 1) +                utc_now = int((date_now - epoch).total_seconds()) +                np.savez("phaseAligned_{}.npz".format(utc_now), samps) + +            # Store the phase alignment stats +            alignment_stats.append({ +                "mean": np.mean(alignment), +                # Subtract the mean before calculating the stddev so we don't +                #     have rollover errors +                "stddev": np.std(alignment - np.mean(alignment)), +                "min": alignment.min(), +                "max": alignment.max(), +                "test_freq": tune_freq, +                "run_freq": tune_away_freq +            }) +        run_means = [run_stats.get("mean", 0.) for run_stats in alignment_stats] +        run_stddevs = [run_stats.get("stddev", 0.) for run_stats in alignment_stats] +        logger.debug("Test freq %.3fMHz health check: %.1f deg drift, %.2f deg max stddev", +                     tune_freq/1e6, +                     max(run_means) - min(run_means), # FIXME: This won't work around +-180 deg +                     max(run_stddevs) +                    ) +        all_alignment_stats[freq_start] = alignment_stats +        # Increment the power level for the next run +        current_power += args.power_step + +    return check_results(all_alignment_stats, args.drift_threshold, args.stddev_threshold) + + +if __name__ == "__main__": +    # Setup the logger with our custom timestamp formatting +    global logger +    logger = logging.getLogger(__name__) +    logger.setLevel(logging.DEBUG) +    console = logging.StreamHandler() +    logger.addHandler(console) +    formatter = LogFormatter(fmt='[%(asctime)s] [%(levelname)s] %(message)s') +    console.setFormatter(formatter) + +    sys.exit(not main())  | 
