diff options
author | Martin Braun <martin.braun@ettus.com> | 2020-04-15 15:10:37 -0700 |
---|---|---|
committer | Aaron Rossetto <aaron.rossetto@ni.com> | 2020-05-28 15:05:19 -0500 |
commit | 65fbf053c059c888fe544327f841313641561255 (patch) | |
tree | 396d0b156f533d0c4d3a64288398af48102b247b /host/python | |
parent | 56f04e4283cf53803a5994e57364cce89232e545 (diff) | |
download | uhd-65fbf053c059c888fe544327f841313641561255.tar.gz uhd-65fbf053c059c888fe544327f841313641561255.tar.bz2 uhd-65fbf053c059c888fe544327f841313641561255.zip |
utils/python: Add uhd_power_cal script
This is a tool for running power calibration.
Diffstat (limited to 'host/python')
-rw-r--r-- | host/python/uhd/__init__.py | 1 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/__init__.py | 3 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/meas_device.py | 353 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/tone_gen.py | 61 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/usrp_calibrator.py | 408 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/visa.py | 129 |
6 files changed, 955 insertions, 0 deletions
diff --git a/host/python/uhd/__init__.py b/host/python/uhd/__init__.py index e12a0626a..8b6a2b36c 100644 --- a/host/python/uhd/__init__.py +++ b/host/python/uhd/__init__.py @@ -11,4 +11,5 @@ from . import types from . import usrp from . import filters from . import rfnoc +from . import dsp from .libpyuhd.paths import * diff --git a/host/python/uhd/usrp/cal/__init__.py b/host/python/uhd/usrp/cal/__init__.py index 77cc3ca35..53de91114 100644 --- a/host/python/uhd/usrp/cal/__init__.py +++ b/host/python/uhd/usrp/cal/__init__.py @@ -14,3 +14,6 @@ Python UHD Module: Calibration sub-module # pylint: disable=wildcard-import from .libtypes import * # pylint: enable=wildcard-import + +from .meas_device import get_meas_device +from .usrp_calibrator import get_usrp_calibrator diff --git a/host/python/uhd/usrp/cal/meas_device.py b/host/python/uhd/usrp/cal/meas_device.py new file mode 100644 index 000000000..22fe78c11 --- /dev/null +++ b/host/python/uhd/usrp/cal/meas_device.py @@ -0,0 +1,353 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Measurement Device Class for UHD Power Calibration +""" + +import sys +import time +import inspect +import importlib +import numpy +import uhd +from .tone_gen import ToneGenerator +from .visa import get_visa_device + +############################################################################### +# Base Classes +############################################################################### +class PowerMeterBase: + """ + Base class for measuring output power (Tx) of the USRP. That means the + measurement device is receiving and the USRP (the DUT) is transmitting. + """ + def __init__(self, options): + self._options = options + self.power_offset = 0 + + def set_frequency(self, freq): + """ + Set the frequency of the measurement device. + """ + raise NotImplementedError() + + def get_power(self): + """ + Return the current measured power in dBm. + """ + return self._get_power() + self.power_offset + + def _get_power(self): + """ + Return the current measured power in dBm. + """ + raise NotImplementedError() + + # pylint: disable=no-self-use + def update_port(self, chan, antenna): + """ + Tell the device we're measuring chan + antenna next + """ + input("[TX] Connect your power meter to device channel {}, " + "antenna {}. Then, hit Enter.".format(chan, antenna)) + # pylint: enable=no-self-use + +class SignalGeneratorBase: + """ + Base class for measuring input power (Rx) of the USRP. That means the + measurement device is transmitting and the USRP (the DUT) is receiving. + """ + def __init__(self, options): + self._options = options + self.power_offset = 0 + # Make sure to set this before doing RX cal + self.max_output_power = None + + def enable(self, enb=True): + """ + Turn on the power generator. By default, it should be off, and only + produce a signal when this was called with an argument value of 'True'. + """ + raise NotImplementedError() + + def set_power(self, power_dbm): + """ + Set the input power of the DUT. This will factor in the power offset, + and set the measurement device to produce the power that will cause the + DUT to receive power_dbm. + + This will coerce to the next possible power available and return the + coerced value. + """ + assert self.max_output_power + if power_dbm > self.max_output_power: + print("[SigGen] WARNING! Trying to set power beyond safe levels. " + "Capping output power at {} dBm.".format(self.max_output_power)) + power_dbm = self.max_output_power + return self._set_power(power_dbm + self.power_offset) - self.power_offset + + def get_power(self): + """ + Return the input power of the DUT. This will factor in the power offset, + and will return the power level in dBm that is going into the DUT. + Use this with set_power(), as not all power levels can be reached. + """ + return self._get_power() - self.power_offset + + def set_frequency(self, freq): + """ + Set the center frequency of the generated signal. + """ + raise NotImplementedError() + + def _set_power(self, power_dbm): + """ + Set the output power of the device in dBm. + """ + raise NotImplementedError() + + def _get_power(self): + """ + Return the output power of the measurement device. + """ + raise NotImplementedError() + + # pylint: disable=no-self-use + def update_port(self, chan, antenna): + """ + Tell the device we're measuring chan + antenna next + """ + input("[RX] Connect your signal generator to device channel {}, " + "antenna {}. Then, hit Enter.".format(chan, antenna)) + # pylint: enable=no-self-use + + +############################################################################### +# Manual Measurement: For masochists, or for small sample sets +############################################################################### +class ManualPowerMeter(PowerMeterBase): + """ + Manual measurement: The script does nothing, it just asks the user to + manually make changes and return values + """ + key = 'manual' + + def set_frequency(self, freq): + """ + Ask user to set frequency + """ + input("[TX] Set your power meter to following frequency: " + "{:.3f} MHz, then hit Enter.".format(freq/1e6)) + + def _get_power(self): + """ + Ask user for the power + """ + num_tries = 5 + for _ in range(num_tries): + try: + return float(input("[TX] Please enter the measured power in dBm: ")) \ + + self.power_offset + except ValueError: + continue + raise ValueError("Invalid power value entered.") + +class ManualPowerGenerator(SignalGeneratorBase): + """ + Manual measurement: The script does nothing, it just asks the user to + manually make changes and return values + """ + key = 'manual' + num_tries = 5 + + def enable(self, enable=True): + """ + Ask the user to turn the device on or off + """ + input("[RX] Please {} your signal generator and hit Enter." + .format("enable" if enable else "disable")) + + def _set_power(self, power_dbm): + """ + Ask for a power, or the closest, and return that + """ + new_power = input( + "[RX] Set your signal generator to following output power: " + "{:.1f} dBm, then hit Enter, or enter the closest available power: " + .format(power_dbm)) + if not new_power: + return power_dbm + for _ in range(self.num_tries): + try: + return float(new_power) + except ValueError: + new_power = input( + "[RX] Set your signal generator to following output power: " + "{:.1f} dBm, then hit Enter, or enter the closest available power: " + .format(power_dbm)) + if not new_power: + return power_dbm + raise ValueError("Invalid power value entered.") + + def _get_power(self): + """ + Ask user for current power + """ + for _ in range(self.num_tries): + try: + return float(input( + "[RX] Please enter the output power in dBm of your " + "signal generator: ")) + except ValueError: + continue + raise ValueError("Invalid power value entered.") + + # pylint: disable=no-self-use + def set_frequency(self, freq): + """ + Set the center frequency of the generated signal. + """ + input("[RX] Set your signal generator to following frequency: {:.3f} MHz, then hit Enter." + .format(freq/1e6)) + # pylint: enable=no-self-use + +############################################################################### +# VISA: Run through a VISA device, using SCPI commands +############################################################################### +class VisaPowerMeter(PowerMeterBase): + """ + VISA based Tx measurement device + """ + DEFAULT_VISA_LIB = '@py' # pyvisa-py + DEFAULT_VISA_QUERY = "?*::INSTR" + + key = 'visa' + + def __init__(self, options): + super().__init__(options) + # pylint: disable=import-outside-toplevel + # We disable this warning because having pyvisa installed is not a + # requirement, so we want to load it as late as possible, and only when + # needed. + import pyvisa + # pylint: enable=import-outside-toplevel + visa_lib = options.get('visa_lib', self.DEFAULT_VISA_LIB) + visa_query = options.get('visa_query', self.DEFAULT_VISA_QUERY) + self._rm = pyvisa.ResourceManager(visa_lib) + resources = self._rm.list_resources(visa_query) + if len(resources) > 1: + print("Found VISA devices:") + for resource in resources: + print("*" + resource) + raise RuntimeError( + "Found more than one measurement device. Please limit the query!") + if len(resources) == 0: + raise RuntimeError("No measurement device found!") + self._res = self._rm.open_resource(resources[0]) + self.visa = get_visa_device(self._res, resources[0], options) + self.visa.init_power_meter() + + def set_frequency(self, freq): + """ + Set frequency + """ + self.visa.set_frequency(freq) + + def _get_power(self): + """ + Get power + """ + return self.visa.get_power_dbm() + +############################################################################### +# USRP: Use a pre-calibrated USRP as a measurement device +############################################################################### +class USRPPowerGenerator(SignalGeneratorBase): + """ + The power generator is actually a USRP. This only works if the USRP that is + used for power/signal generation has been previously calbrated itself. + """ + key = 'usrp' + + def __init__(self, options): + super().__init__(options) + self._usrp = uhd.usrp.MultiUSRP(options.get('args')) + self._rate = float(options.get('rate', 5e6)) + self._lo_offset = float(options.get('lo_offset', 0)) + self._chan = int(options.get('chan', 0)) + self._amplitude = float(options.get('ampl', 1/numpy.sqrt(2))) + self._pwr_dbfs = 20 * numpy.log10(self._amplitude) + self._tone_freq = 0 + stream_args = uhd.usrp.StreamArgs('fc32', 'sc16') + stream_args.channels = [self._chan] + self._streamer = self._usrp.get_tx_stream(stream_args) + print("==== Creating USRP tone generator. Power offset:", self.power_offset) + self._tone_gen = ToneGenerator(self._rate, self._tone_freq, self._amplitude) + self._tone_gen.set_streamer(self._streamer) + + def enable(self, enb=True): + """ + Turn the tone generator on or off. + """ + if enb: + print("[SigGen] Starting tone generator.") + self._tone_gen.start() + else: + print("[SigGen] Stopping tone generator.") + self._tone_gen.stop() + time.sleep(0.1) # Give it some time to spin or down + + def set_frequency(self, freq): + """ + Set the center frequency of the generated signal. + """ + print("[SigGen] Channel {}: Tuning signal to {:.3f} MHz." + .format(self._chan, freq/1e6)) + tune_req = uhd.types.TuneRequest(freq, self._lo_offset) + self._usrp.set_tx_freq(tune_req, self._chan) + + def _set_power(self, power_dbm): + """ + Set the output power of the device in dBm. + """ + self._usrp.set_tx_power_reference(power_dbm - self._pwr_dbfs, self._chan) + return self._get_power() + + def _get_power(self): + """ + Return the output power of the measurement device. + """ + return self._usrp.get_tx_power_reference(self._chan) + self._pwr_dbfs + +############################################################################### +# The dispatch function +############################################################################### +def get_meas_device(direction, dev_key, options): + """ + Return the measurement device object + """ + assert direction in ('tx', 'rx') + base_class = SignalGeneratorBase if direction == 'rx' else PowerMeterBase + opt_dict = { + k[0]: k[1] if len(k) > 1 else None for k in [x.split("=", 1) for x in options] + } + members = inspect.getmembers(sys.modules[__name__]) + if 'import' in opt_dict: + try: + print("Loading external module: {}".format(opt_dict.get('import'))) + external_module = importlib.import_module(opt_dict.get('import')) + members += inspect.getmembers(external_module) + except (ModuleNotFoundError, ImportError): + print("WARNING: Could not import module '{}'" + .format(opt_dict.get('import'))) + for _, obj in members: + try: + if issubclass(obj, base_class) and dev_key == getattr(obj, 'key', ''): + return obj(opt_dict) + except TypeError: + continue + raise RuntimeError("No {} measurement device found for key: {}".format( + direction.upper(), dev_key)) diff --git a/host/python/uhd/usrp/cal/tone_gen.py b/host/python/uhd/usrp/cal/tone_gen.py new file mode 100644 index 000000000..b9d986132 --- /dev/null +++ b/host/python/uhd/usrp/cal/tone_gen.py @@ -0,0 +1,61 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Provide a tone generator class for USRPs. +""" + +import threading +import numpy +import uhd + +class ToneGenerator: + """ + Class that can output a tone from a different thread until told to stop + """ + def __init__(self, rate, freq, ampl, streamer=None): + self._streamer = streamer + self._buffer = uhd.dsp.signals.get_continuous_tone(rate, freq, ampl) + self._run = False + self._thread = None + + def set_streamer(self, streamer): + """ + Update streamer object + """ + if self._run: + self.stop() + self._streamer = streamer + + def start(self): + """ + Spawn the thread in the background + """ + if not self._streamer: + raise RuntimeError("No streamer defined!") + self._run = True + self._thread = threading.Thread(target=self._worker) + self._thread.start() + self._thread.setName("cal_tx") + + def stop(self): + """ + Stop the transmitter + """ + self._run = False + self._thread.join() + self._thread = None + + def _worker(self): + """ Here is where the action happens """ + metadata = uhd.types.TXMetadata() + while self._run: + # Give it a long-ish timeout so we don't have to throttle in here + if self._streamer.send(self._buffer, metadata, 1.0) != len(self._buffer): + print("WARNING: Failed to transmit entire buffer in ToneGenerator!") + # Send an EOB packet with a single zero-valued sample to close out TX + metadata.end_of_burst = True + self._streamer.send( + numpy.array([0], dtype=numpy.complex64), metadata, 0.1) diff --git a/host/python/uhd/usrp/cal/usrp_calibrator.py b/host/python/uhd/usrp/cal/usrp_calibrator.py new file mode 100644 index 000000000..a70747201 --- /dev/null +++ b/host/python/uhd/usrp/cal/usrp_calibrator.py @@ -0,0 +1,408 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +UHD Power Cal: USRP Calbration Utility Objects +""" + +import time +import inspect +import sys +import numpy +import uhd + +from . import database +from .tone_gen import ToneGenerator + +NUM_SAMPS_PER_EST = int(1e6) +# Limits for the power estimation algorithm. For good estimates, we want the +# signal to be at -6 dBFS, but not outside of an upper or lower limit. +PWR_EST_LLIM = -20 +PWR_EST_IDEAL_LEVEL = -6 +PWR_EST_ULIM = -3 +SIGPWR_LOCK_MAX_ITER = 4 + +# The default distance between frequencies at which we measure +DEFAULT_FREQ_STEP = 10e6 # Hz +DEFAULT_SAMP_RATE = 5e6 + +def get_streamer(usrp, direction, chan): + """ + Create an appropriate streamer object for this channel + """ + stream_args = uhd.usrp.StreamArgs('fc32', 'sc16') + stream_args.channels = [chan] + return usrp.get_rx_stream(stream_args) if direction == 'rx' \ + else usrp.get_tx_stream(stream_args) + +def get_default_gains(direction, gain_range, gain_step): + """ + Create a equidistant gain range for calibration + """ + assert direction in ('rx', 'tx') + if direction == 'tx': + return numpy.arange(0, gain_range.stop(), gain_step) + return numpy.arange(gain_range.stop(), 0, -gain_step) + +def get_usrp_power(streamer, num_samps=NUM_SAMPS_PER_EST, chan=0): + """ + Return the measured input power in dBFS + + The return value is a list of dBFS power values, one per channel. + """ + recv_buffer = numpy.zeros( + (streamer.get_num_channels(), num_samps), dtype=numpy.complex64) + metadata = uhd.types.RXMetadata() + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done) + stream_cmd.num_samps = num_samps + stream_cmd.stream_now = True + streamer.issue_stream_cmd(stream_cmd) + # Pass in long timeout, so we can rx the entire buffer in one go + samps_recvd = streamer.recv(recv_buffer, metadata, 5.0) + if samps_recvd != num_samps: + raise RuntimeError( + "ERROR! get_usrp_power(): Did not receive the correct number of samples!") + return uhd.dsp.signals.get_power_dbfs(recv_buffer[chan]) + + +def subtract_power(p1_db, p2_db): + """ + Return the power of p1 subtracted from p2, where both are given in logarithmic + units. + """ + return 10 * numpy.log10(10**(p1_db/10) - 10**(p2_db/10)) + +############################################################################### +# Base Class +############################################################################### +class USRPCalibratorBase(object): + """ + Base class for device calibration. Every USRP that can do power calibration + needs to implement this. + """ + # These should be overriden to device-specific values + max_input_power = -20 # -20 happens to be a safe value for all devices + min_detectable_signal = -70 + default_rate = DEFAULT_SAMP_RATE + lo_offset = 0.0 + + def __init__(self, usrp, meas_dev, direction, **kwargs): + self._usrp = usrp + self._meas_dev = meas_dev + # This makes sure our measurement will not destroy the DUT + self._meas_dev.max_output_power = self.max_input_power + self._dir = direction + self._desired_gain_step = kwargs.get('gain_step', 10) + self._id = usrp.get_usrp_rx_info(0).get('mboard_id') + self._mb_serial = usrp.get_usrp_rx_info(0).get('mboard_serial') + # Littler helper to print stuff with a device ID prefix. + self.log = lambda *args, **kwargs: print("[{}]".format(self._id), *args, **kwargs) + # Channel, antenna, and streamer will get updated in update_port() + self._chan = None + self._ant = "" + self._streamer = None + # These dictionaries store the results that get written out as well as + # the noise floor for reference + self.results = {} # This must be of the form results[freq][gain] = power + self._noise = {} + # The tone generator object is only needed for Tx measurements, and is + # initialized conditionaly in init() + self._tone_gen = None + # The gains can be overridden by the device if non-equidistant gains are + # desired. However, gains must be increasing order for Tx measurements, + # and in decreasing order for Rx measurements. + self._gains = get_default_gains( + direction, + getattr(self._usrp, 'get_{}_gain_range'.format(self._dir))(), + self._desired_gain_step) + # You might want to override this, but it's not important. It will + # become the 'name' argument for the power cal factory. + self.cal_name = self._id + " Power Cal" + # The child class can store temperature and ref gain here + self.temp = None + self.ref_gain = None + + def init(self, rate, tone_freq, amplitude): + """ + Initialize device with finalized values. Not that __init__() needs to + finish before the call site knows the rate, so we can' fold this into + __init__(). + """ + if self._dir == 'tx': + self._tone_gen = ToneGenerator(rate, tone_freq, amplitude) + + def update_port(self, chan, antenna): + """ + Notify the device that we've switched channel and/or antenna. + """ + self.log("Switching to channel {}, antenna {}.".format(chan, antenna)) + self._ant = antenna + if chan != self._chan: + # This will be an RX streamer for RX power cal, and a TX streamer + # for TX power cal. + self._streamer = get_streamer(self._usrp, self._dir, chan) + if self._dir == 'tx': + self._tone_gen.set_streamer(self._streamer) + self._chan = chan + + def _get_frequencies(self, start_hint=None, stop_hint=None, step_hint=None): + """ + Return an iterable of frequencies for testing. + + The default will check the hints against the device, but otherwise heed + them. + + If a particular device needs to check specific frequencies, then + override this. + """ + start_min = \ + getattr(self._usrp, 'get_{}_freq_range'.format(self._dir))( + self._chan).start() + start_hint = start_hint or start_min + start = max(start_hint, start_min) + stop_max = \ + getattr(self._usrp, 'get_{}_freq_range'.format(self._dir))( + self._chan).stop() + stop_hint = stop_hint or stop_max + stop = min(stop_hint, stop_max) + step = step_hint or DEFAULT_FREQ_STEP + return numpy.arange(start, stop + step, step) + + def init_frequencies(self, start_hint, stop_hint, step_hint): + """ + Return an iterable of frequencies for testing. + + The default will check the hints against the device, but otherwise heed + them. + + Then it will measure the noise floor across frequency to get a good + baseline measurement. + """ + freqs = self._get_frequencies(start_hint, stop_hint, step_hint) + if self._dir == 'tx': + print("===== Measuring noise floor across frequency...") + for freq in freqs: + self._meas_dev.set_frequency(freq) + self._noise[freq] = self._meas_dev.get_power() + print("[TX] Noise floor: {:2} MHz => {:+6.2f} dBm" + .format(freq/1e6, self._noise[freq])) + else: # Rx + print("===== Measuring noise floor across frequency and gain...") + for freq in freqs: + self._noise[freq] = {} + tune_req = uhd.types.TuneRequest(freq) + self._usrp.set_rx_freq(tune_req, self._chan) + for gain in self._gains: + self._usrp.set_rx_gain(gain, self._chan) + self._noise[freq][gain] = get_usrp_power(self._streamer) + print("[RX] Noise floor: {:2} MHz / {} dB => {:+6.2f} dBFS" + .format(freq/1e6, gain, self._noise[freq][gain])) + return freqs + + def start(self): + """ + Initialize the device for calibration + """ + if self._dir == 'tx': + self._tone_gen.start() + else: + self._meas_dev.enable(True) + + def stop(self, store=True): + """ + Shut down the device after calibration + """ + if self._dir == 'tx': + self._tone_gen.stop() + else: + self._meas_dev.enable(False) + if store: + self.store() + + def run_rx_cal(self, freq): + """ + Run the actual RX calibration for this frequency. + """ + # Go to highest gain, lock in signal generator + self._usrp.set_rx_gain(max(self._gains), self._chan) + time.sleep(0.1) # Settling time of the USRP, highly conservative + self.log("Locking in signal generator power...") + self.log("Requesting input power: {:+.2f} dBm." + .format(self.min_detectable_signal)) + usrp_input_power = self._meas_dev.set_power(self.min_detectable_signal) + recvd_power = get_usrp_power(self._streamer) + self.log("Got input power: {:+.2f} dBm. Received power: {:.2f} dBFS. " + "Requesting new input power: {:+.2f} dBm." + .format(usrp_input_power, + recvd_power, + usrp_input_power + PWR_EST_IDEAL_LEVEL - recvd_power)) + usrp_input_power = self._meas_dev.set_power( + usrp_input_power + PWR_EST_IDEAL_LEVEL - recvd_power) + siggen_locked = False + for _ in range(SIGPWR_LOCK_MAX_ITER): + recvd_power = get_usrp_power(self._streamer) + if PWR_EST_LLIM <= recvd_power <= PWR_EST_ULIM: + siggen_locked = True + break + self.log("Receiving input power: {:+.2f} dBFS.".format(recvd_power)) + power_delta = PWR_EST_IDEAL_LEVEL - recvd_power + # Update power output by the delta from the desired input value: + self.log("Requesting input power: {:+.2f} dBm." + .format(usrp_input_power + power_delta)) + usrp_input_power = self._meas_dev.set_power(usrp_input_power + power_delta) + if not siggen_locked: + raise RuntimeError( + "Unable to lock siggen within {} iterations! Last input power level: {:+6.2f} dBm." + .format(SIGPWR_LOCK_MAX_ITER, usrp_input_power)) + self.log("Locked signal generator in at input power level: {:+6.2f} dBm." + .format(usrp_input_power)) + # Now iterate through gains + results = {} + # Gains are in decreasing order! + last_gain = self._gains[0] + for gain in self._gains: + self._usrp.set_rx_gain(gain, self._chan) # Set the new gain + self.log("Set gain to: {} dB. Got gain: {} dB." + .format(gain, self._usrp.get_rx_gain(self._chan))) + time.sleep(0.1) # Settling time of the USRP, highly conservative + gain_delta = last_gain - gain # This is our gain step + if gain_delta: + # If we decrease the device gain, we need to crank up the input + # power + usrp_input_power = self._meas_dev.set_power( + min(usrp_input_power + gain_delta, self.max_input_power)) + # usrp_input_power = self._meas_dev.set_power(usrp_input_power + gain_delta) + self.log("New input power is: {:+.2f} dBm".format(usrp_input_power)) + recvd_power = get_usrp_power(self._streamer) + self.log("Received power: {:.2f} dBFS".format(recvd_power)) + # It's possible that we lose the lock on the signal power, so allow + # for a correction + if not PWR_EST_LLIM <= recvd_power <= PWR_EST_ULIM: + power_delta = PWR_EST_IDEAL_LEVEL - recvd_power + self.log("Adapting input power to: {:+.2f} dBm." + .format(usrp_input_power + power_delta)) + usrp_input_power = self._meas_dev.set_power(usrp_input_power + power_delta) + self.log("New input power is: {:+.2f} dBm".format(usrp_input_power)) + # And then of course, measure again + recvd_power = get_usrp_power(self._streamer) + self.log("Received power: {:.2f} dBFS".format(recvd_power)) + # Note: The noise power should be way down there, and really + # shouldn't matter. We subtract it anyway for formal correctness. + recvd_signal_power = subtract_power(recvd_power, self._noise[freq][gain]) + # A note on the following equation: 'recvd_signal_power' is in dBFS, + # and usrp_input_power is in dBm. However, this is the reference + # signal, so we need the power (in dBm) that corresponds to 0 dBFS. + # The assumption is that digital gain is linear, so what we really + # want is usrp_input_power - (recvd_signal_power - 0dBFS), and the + # result of the equation is in dBm again. We omit the subtract-by-zero + # since our variables don't have units. + results[gain] = usrp_input_power - recvd_signal_power + self.log("{:2} dB => {:+6.2f} dBm".format(gain, results[gain])) + # If we get too close to the noise floor, we stop + if recvd_power - self._noise[freq][gain] <= 1.5: + self.log("Can no longer detect input signal. Terminating.") + break + self.results[freq] = results + + def run_tx_cal(self, freq): + """ + Run the actual TX calibration for this frequency. + """ + results = {} + for gain in self._gains: + self._usrp.set_tx_gain(gain, self._chan) + time.sleep(0.1) # Settling time of the USRP, highly conservative + results[gain] = self._meas_dev.get_power() + self.log("{:2} dB => {:+6.2f} dBm".format(gain, results[gain])) + self.results[freq] = results + + def store(self): + """ + Return the results object + """ + chan_info = getattr(self._usrp, "get_usrp_{}_info".format(self._dir))(self._chan) + cal_key = chan_info.get("{}_ref_power_key".format(self._dir)) + cal_serial = chan_info.get("{}_ref_power_serial".format(self._dir)) + cal_data = uhd.usrp.cal.PwrCal(self.cal_name, cal_serial, int(time.time())) + if self.temp: + cal_data.set_temperature(self.temp) + if self.ref_gain: + cal_data.set_ref_gain(self.ref_gain) + for freq, results in self.results.items(): + max_power = max(results.values()) + min_power = min(results.values()) + cal_data.add_power_table(results, min_power, max_power, freq) + database.write_cal_data( + cal_key, + cal_serial, + cal_data.serialize()) + self.results = {} + +class B200Calibrator(USRPCalibratorBase): + """ + B200 calibration + """ + mboard_ids = ('B200', 'B210', 'B200mini', 'B205mini') + # Choosing 5 MHz: It is a small rate, but carries enough bandwidth to receive + # a tone. By default, the auto MCR will kick in and set the MCR to 40 Msps, + # thus engaging all halfbands. + default_rate = 5e6 + # Choosing an LO offset of 10 MHz: At 5 Msps, the LO will never be within + # our estimate. B200 generally has good DC offset / IQ balance performance, + # but we still try and avoid DC as much as possible. + lo_offset = 10e6 + + def __init__(self, usrp, meas_dev, direction, **kwargs): + super().__init__(usrp, meas_dev, direction, **kwargs) + print("===== Reading temperature... ", end="") + self.temp = self._usrp.get_rx_sensor("temp").to_int() + print("{} C".format(self.temp)) + # TODO don't hard code + self.ref_gain = 60 + +class X300Calibrator(USRPCalibratorBase): + """ + X300/X310 calibration + + Notes / TODOs: + - The X310 must avoid frequencies that are multiples of 200 MHz; here be + harmonics (TODO) + - TwinRX needs its own special method. It needs to be run once with one + channel in the streamer, and once with two channels in the streamer. Or we + come up with a clever way of enabling the other channel without modifying + the streamer. A poke to the prop tree might do the trick. + """ + mboard_ids = ('X300', 'X310', 'NI-2974') + # Choosing 5 MHz: It is a small rate, but carries enough bandwidth to receive + # a tone. It's 1/40 the master clock rate, which means it'll engage max + # halfbands. + default_rate = 5e6 + # Choosing an LO offset of 10 MHz: At 5 Msps, the LO will never be within + # our estimate, so it doesn't matter if this device is DC offset / IQ balance + # calibrated. + lo_offset = 10e6 + + +############################################################################### +# The dispatch function +############################################################################### +def get_usrp_calibrator(usrp, meas_dev, direction, **kwargs): + """ + Return a USRP calibrator object. + """ + usrp_type = \ + getattr(usrp, 'get_usrp_{}_info'.format(direction))().get('mboard_id') + if usrp_type is None: + raise RuntimeError("Could not determine USRP type!") + print("=== Detected USRP type:", usrp_type) + for _, obj in inspect.getmembers(sys.modules[__name__]): + try: + if issubclass(obj, USRPCalibratorBase) \ + and usrp_type in getattr(obj, 'mboard_ids', ''): + return obj(usrp, meas_dev, direction, **kwargs) + except TypeError: + continue + raise RuntimeError("No USRP calibrator object found for device type: {}" + .format(usrp_type)) diff --git a/host/python/uhd/usrp/cal/visa.py b/host/python/uhd/usrp/cal/visa.py new file mode 100644 index 000000000..16c2aff56 --- /dev/null +++ b/host/python/uhd/usrp/cal/visa.py @@ -0,0 +1,129 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +VISA-type measurement devices +""" + +import re +import sys +import inspect +import importlib + +class VISADevice: + """ + Parent class for VISA/SCPI devices (that can be accessed via PyVISA) + """ + # Must contain a dictionary res_ids. See class below as an example + + def __init__(self, resource): + self.res = resource + # You can query the exact model etc. like this: + # id = self._res.query("*IDN?").strip( + + def init_power_meter(self): + """ + Initialize this device to measure power. + """ + raise NotImplementedError() + + def init_signal_generator(self): + """ + Initialize this device to generate signals. + """ + raise NotImplementedError() + + def set_frequency(self, freq): + """ + Set frequency + """ + raise NotImplementedError() + + def get_power_dbm(self): + """ + Return the received/measured power in dBm (power meter) or return the + output power it's currently set to (signal generator). + """ + raise NotImplementedError() + + +class USBPowerMeter(VISADevice): + """ + USB-based power measurement devices + """ + # The keys of res_ids are regular expressions, which have to match the + # resource ID of the VISA device. The values are a name for the device, which + # is used for informing the user which driver was found. + # + # A class can match any number of resource IDs. If the commands used depend + # on the specific ID, it can be queried in the appropriate init function + # using the *IDN? command which is understood by all VISA devices. + res_ids = { + r'USB\d+::2733::376::\d+::0::INSTR': 'R&S NRP-6A', + } + + def init_power_meter(self): + """ + Enable the sensor to read power + """ + self.res.timeout = 5000 + self.res.write("SENS:AVER:COUN 20") + self.res.write("SENS:AVER:COUN:AUTO ON") + self.res.write('UNIT:POW DBM') + self.res.write('SENS:FUNC "POW:AVG"') + + def init_signal_generator(self): + """ + This class is for power meters, so no bueno + """ + raise RuntimeError("This device cannot be used for signal generation!") + + def set_frequency(self, freq): + """ + Set frequency + """ + self.res.write('SENS:FREQ {}'.format(freq)) + + def get_power_dbm(self): + """ + Return measured power in dBm + """ + self.res.write('INIT:IMM') + return float(self.res.query('FETCH?')) + +############################################################################### +# The dispatch function +############################################################################### +def get_visa_device(resource, key, opt_dict): + """ + Return the VISA device object + """ + def match_res(obj): + """ + Check if a class obj matches the requested key + """ + for pattern, res_id in getattr(obj, 'res_ids', {}).items(): + if re.match(pattern, key): + print("Found VISA device: {}".format(res_id)) + return True + return False + # Import additional modules if requested + members = inspect.getmembers(sys.modules[__name__]) + if 'import' in opt_dict: + try: + print("Loading external module: {}".format(opt_dict.get('import'))) + external_module = importlib.import_module(opt_dict.get('import')) + members += inspect.getmembers(external_module) + except (ModuleNotFoundError, ImportError): + print("WARNING: Could not import module '{}'" + .format(opt_dict.get('import'))) + # Now browse classes and find one that matches + for _, obj in members: + try: + if issubclass(obj, VISADevice) and match_res(obj): + return obj(resource) + except TypeError: + continue + raise RuntimeError("No VISA device class found for key: {}".format(key)) |