diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/docs/power.dox | 117 | ||||
-rw-r--r-- | host/python/uhd/__init__.py | 1 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/__init__.py | 3 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/meas_device.py | 353 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/tone_gen.py | 61 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/usrp_calibrator.py | 408 | ||||
-rw-r--r-- | host/python/uhd/usrp/cal/visa.py | 129 | ||||
-rw-r--r-- | host/utils/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/utils/uhd_power_cal.py | 250 |
9 files changed, 1323 insertions, 0 deletions
diff --git a/host/docs/power.dox b/host/docs/power.dox index 816c74824..7ba93fb61 100644 --- a/host/docs/power.dox +++ b/host/docs/power.dox @@ -8,6 +8,11 @@ Starting with UHD 4, UHD comes with reference power level APIs. These allow to not just set a relative gain level, but configure a USRP to transmit a certain power, or to estimate received power levels. +\b DISCLAIMER: USRPs are not factory-calibrated test and measurement devices, +but general purpose SDR devices. As such, they are not intended to replace +factory-calibrated power meters or signal generators, nor are they suitable +devices for doing so. + The actual transmitted or received power also depends on the transmitted signal itself. The absolute power setting is thus a *reference power level*. The reference power level maps a digital signal level (in dBFS) to an absolute, @@ -150,5 +155,117 @@ These tables can be stored in three different ways: - On a file. Calibration data in a file is the most flexible, it can be replaced easily. It is however local to the computer that stores the calibration data. +Access to the calibration data usually is done by using the uhd::usrp::cal::database +class. Calibration data is identified by two identifiers, a key, and a serial. + +The \b key is a string that identifies a particular hardware path for an RF signal. +For example, the B200 and B210 use the key `b2xx_power_cal_rx_rx2` to identify +the RF path from the RX2 SMA connector on a B200mini all the way to the RFIC. On +the B210, the same key is used for channels A and B, because the RF paths are +identical (the PCB traces are symmetrical, and the components are the same as well). +On the B200mini however, the RF path from RX2 to the RFIC is different (the PCB +is smaller, for example) and thus the key is different (`b2xxmini_power_cal_rx_rx2`). + +The \b serial is usually derived from the serial of the DUT itself, but may also +include other information, such as the channel. On the B210, the calibration +serial consists of the motherboard serial plus the channel identifier (for +example, if the device had a serial of `FFF1234`, the two calibration serials +would be `FFF1234#A` and `FFF1234#B`. This way, the two channels can have their +own calibration data. + +The key and serial that are used for a specific device can be queried from the +device by either calling uhd::usrp::multi_usrp::get_usrp_rx_info() when using +the multi_usrp API, or calling uhd::rfnoc::radio_control::get_rx_power_ref_keys(). +Equivalent calls for TX calibration are also available. + +If calibration data is hard-coded as part of UHD, the serial doesn't apply. +That is because the only calibration data hard-coded in UHD is data that can be +applied to all devices, and has been vetted against several measurement data +sets. Such data will carry a much higher calibration error than specifically +generated calibration data. + +\section power_usercal Calibrating your own device + +If UHD does not ship its own calibration data for a device, or if the power +calbration must be finely tuned, it is necessary to manually calbrate the device. + +In order to calibrate the transmit power, a calibrated power meter is required. +To calibrate the receive power, a calibrated signal generator is required. Note +that it is possible to use a calibrated USRP as power meter / signal generator. +A calibrated USRP can thus be used to calibrate another USRP. + +The calibration is performed using the `uhd_power_cal.py` utility, which is +usually installed into the utilities directory (for example, `/usr/lib/uhd/utils` +on some Linux distributions). It requires the Python API of UHD. + +The tool will control both the DUT (i.e., the USRP that is to be calibrated) as +well as the measurement device (power meter or signal generator). +UHD ships with some drivers for measurement devices, but can be extended for +others easily (see \ref power_usercal_extend). + +In order to run a calibration, the measurement device and the DUT need to be +connected to the host PC on which the calbration measurement is performed. +The following command will calibrate a B200 transmit power using a VISA-based +power meter: + + uhd_power_cal.py --args type=b200 -d tx --meas-dev visa + +By default, it will try and calibrate all channels (for a B210). The calibration +can be further constrained by limiting the frequency range, and the gain/frequency +steps (for more coarse or fine calibration data). + +The tool has hard-coded some sensible defaults for most devices, such as frequency +and gain steps. + +\subsection power_usercal_extend Extending the calibration utility for custom drivers + +\subsubsection power_usercal_extend_visa VISA/SCPI based devices + +Measurement devices using SCPI commands are particularly easy to add. UHD uses +PyVISA to access VISA-based devices, so make sure to follow the PyVISA manual +to set that driver up. For example, USB-based power meters may require setting +additional privileges or system settings in order for user-space utilities to +be able to communicate with them. + +Assuming PyVISA is working, and your VISA-based measurement device is reachable +from PyVISA, exposing your VISA-based device is done by creating a Python +module for your device. Assume the file is called `myvisa.py` and has the +following content: + +~~~{.py~ +from uhd.usrp.cal.visa import VISADevice + +class MyVISADevice(VISADevice): + res_ids = {r'::1234::': "My VISA Device"} + + def init_power_meter(self): + self.res.write("INIT") # Put appropriate SCPI commands here + + # ... all other API calls ... +~~~ + +Now you can run the power calibration utility as such: + + uhd_power_cal.py --meas-dev visa -o import=myvisa [other args] + +This will try and import `myvisa`, and will automatically detect classes within +that file. If the VISA device used for calibration matches the resource ID (in +this example, it needs to contain the substring `::1234::`), this class will be +chosen. On success, the utility should print a string like +"Found VISA device: My VISA Device". + +The file `visa.py` within the UHD Python module is a useful reference for +writing custom VISA drivers. + +\subsubsection power_usercal_extend_generic Other measurement devices + +If a measurement device is not a VISA device, the procedure above can still +be used. The only requirement is that the devices can be controlled from Python. +The driver classes must derive either from `uhd.usrp.cal.meas_device.PowerMeterBase` +or `uhd.usrp.cal.meas_device.PowerGeneratorBase`, respectively. + +The file `meas_device.py` in the UHD Python modulues contains the default +drivers, as well as examples and further documentation. + */ // vim:ft=doxygen: diff --git a/host/python/uhd/__init__.py b/host/python/uhd/__init__.py index e12a0626a..8b6a2b36c 100644 --- a/host/python/uhd/__init__.py +++ b/host/python/uhd/__init__.py @@ -11,4 +11,5 @@ from . import types from . import usrp from . import filters from . import rfnoc +from . import dsp from .libpyuhd.paths import * diff --git a/host/python/uhd/usrp/cal/__init__.py b/host/python/uhd/usrp/cal/__init__.py index 77cc3ca35..53de91114 100644 --- a/host/python/uhd/usrp/cal/__init__.py +++ b/host/python/uhd/usrp/cal/__init__.py @@ -14,3 +14,6 @@ Python UHD Module: Calibration sub-module # pylint: disable=wildcard-import from .libtypes import * # pylint: enable=wildcard-import + +from .meas_device import get_meas_device +from .usrp_calibrator import get_usrp_calibrator diff --git a/host/python/uhd/usrp/cal/meas_device.py b/host/python/uhd/usrp/cal/meas_device.py new file mode 100644 index 000000000..22fe78c11 --- /dev/null +++ b/host/python/uhd/usrp/cal/meas_device.py @@ -0,0 +1,353 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Measurement Device Class for UHD Power Calibration +""" + +import sys +import time +import inspect +import importlib +import numpy +import uhd +from .tone_gen import ToneGenerator +from .visa import get_visa_device + +############################################################################### +# Base Classes +############################################################################### +class PowerMeterBase: + """ + Base class for measuring output power (Tx) of the USRP. That means the + measurement device is receiving and the USRP (the DUT) is transmitting. + """ + def __init__(self, options): + self._options = options + self.power_offset = 0 + + def set_frequency(self, freq): + """ + Set the frequency of the measurement device. + """ + raise NotImplementedError() + + def get_power(self): + """ + Return the current measured power in dBm. + """ + return self._get_power() + self.power_offset + + def _get_power(self): + """ + Return the current measured power in dBm. + """ + raise NotImplementedError() + + # pylint: disable=no-self-use + def update_port(self, chan, antenna): + """ + Tell the device we're measuring chan + antenna next + """ + input("[TX] Connect your power meter to device channel {}, " + "antenna {}. Then, hit Enter.".format(chan, antenna)) + # pylint: enable=no-self-use + +class SignalGeneratorBase: + """ + Base class for measuring input power (Rx) of the USRP. That means the + measurement device is transmitting and the USRP (the DUT) is receiving. + """ + def __init__(self, options): + self._options = options + self.power_offset = 0 + # Make sure to set this before doing RX cal + self.max_output_power = None + + def enable(self, enb=True): + """ + Turn on the power generator. By default, it should be off, and only + produce a signal when this was called with an argument value of 'True'. + """ + raise NotImplementedError() + + def set_power(self, power_dbm): + """ + Set the input power of the DUT. This will factor in the power offset, + and set the measurement device to produce the power that will cause the + DUT to receive power_dbm. + + This will coerce to the next possible power available and return the + coerced value. + """ + assert self.max_output_power + if power_dbm > self.max_output_power: + print("[SigGen] WARNING! Trying to set power beyond safe levels. " + "Capping output power at {} dBm.".format(self.max_output_power)) + power_dbm = self.max_output_power + return self._set_power(power_dbm + self.power_offset) - self.power_offset + + def get_power(self): + """ + Return the input power of the DUT. This will factor in the power offset, + and will return the power level in dBm that is going into the DUT. + Use this with set_power(), as not all power levels can be reached. + """ + return self._get_power() - self.power_offset + + def set_frequency(self, freq): + """ + Set the center frequency of the generated signal. + """ + raise NotImplementedError() + + def _set_power(self, power_dbm): + """ + Set the output power of the device in dBm. + """ + raise NotImplementedError() + + def _get_power(self): + """ + Return the output power of the measurement device. + """ + raise NotImplementedError() + + # pylint: disable=no-self-use + def update_port(self, chan, antenna): + """ + Tell the device we're measuring chan + antenna next + """ + input("[RX] Connect your signal generator to device channel {}, " + "antenna {}. Then, hit Enter.".format(chan, antenna)) + # pylint: enable=no-self-use + + +############################################################################### +# Manual Measurement: For masochists, or for small sample sets +############################################################################### +class ManualPowerMeter(PowerMeterBase): + """ + Manual measurement: The script does nothing, it just asks the user to + manually make changes and return values + """ + key = 'manual' + + def set_frequency(self, freq): + """ + Ask user to set frequency + """ + input("[TX] Set your power meter to following frequency: " + "{:.3f} MHz, then hit Enter.".format(freq/1e6)) + + def _get_power(self): + """ + Ask user for the power + """ + num_tries = 5 + for _ in range(num_tries): + try: + return float(input("[TX] Please enter the measured power in dBm: ")) \ + + self.power_offset + except ValueError: + continue + raise ValueError("Invalid power value entered.") + +class ManualPowerGenerator(SignalGeneratorBase): + """ + Manual measurement: The script does nothing, it just asks the user to + manually make changes and return values + """ + key = 'manual' + num_tries = 5 + + def enable(self, enable=True): + """ + Ask the user to turn the device on or off + """ + input("[RX] Please {} your signal generator and hit Enter." + .format("enable" if enable else "disable")) + + def _set_power(self, power_dbm): + """ + Ask for a power, or the closest, and return that + """ + new_power = input( + "[RX] Set your signal generator to following output power: " + "{:.1f} dBm, then hit Enter, or enter the closest available power: " + .format(power_dbm)) + if not new_power: + return power_dbm + for _ in range(self.num_tries): + try: + return float(new_power) + except ValueError: + new_power = input( + "[RX] Set your signal generator to following output power: " + "{:.1f} dBm, then hit Enter, or enter the closest available power: " + .format(power_dbm)) + if not new_power: + return power_dbm + raise ValueError("Invalid power value entered.") + + def _get_power(self): + """ + Ask user for current power + """ + for _ in range(self.num_tries): + try: + return float(input( + "[RX] Please enter the output power in dBm of your " + "signal generator: ")) + except ValueError: + continue + raise ValueError("Invalid power value entered.") + + # pylint: disable=no-self-use + def set_frequency(self, freq): + """ + Set the center frequency of the generated signal. + """ + input("[RX] Set your signal generator to following frequency: {:.3f} MHz, then hit Enter." + .format(freq/1e6)) + # pylint: enable=no-self-use + +############################################################################### +# VISA: Run through a VISA device, using SCPI commands +############################################################################### +class VisaPowerMeter(PowerMeterBase): + """ + VISA based Tx measurement device + """ + DEFAULT_VISA_LIB = '@py' # pyvisa-py + DEFAULT_VISA_QUERY = "?*::INSTR" + + key = 'visa' + + def __init__(self, options): + super().__init__(options) + # pylint: disable=import-outside-toplevel + # We disable this warning because having pyvisa installed is not a + # requirement, so we want to load it as late as possible, and only when + # needed. + import pyvisa + # pylint: enable=import-outside-toplevel + visa_lib = options.get('visa_lib', self.DEFAULT_VISA_LIB) + visa_query = options.get('visa_query', self.DEFAULT_VISA_QUERY) + self._rm = pyvisa.ResourceManager(visa_lib) + resources = self._rm.list_resources(visa_query) + if len(resources) > 1: + print("Found VISA devices:") + for resource in resources: + print("*" + resource) + raise RuntimeError( + "Found more than one measurement device. Please limit the query!") + if len(resources) == 0: + raise RuntimeError("No measurement device found!") + self._res = self._rm.open_resource(resources[0]) + self.visa = get_visa_device(self._res, resources[0], options) + self.visa.init_power_meter() + + def set_frequency(self, freq): + """ + Set frequency + """ + self.visa.set_frequency(freq) + + def _get_power(self): + """ + Get power + """ + return self.visa.get_power_dbm() + +############################################################################### +# USRP: Use a pre-calibrated USRP as a measurement device +############################################################################### +class USRPPowerGenerator(SignalGeneratorBase): + """ + The power generator is actually a USRP. This only works if the USRP that is + used for power/signal generation has been previously calbrated itself. + """ + key = 'usrp' + + def __init__(self, options): + super().__init__(options) + self._usrp = uhd.usrp.MultiUSRP(options.get('args')) + self._rate = float(options.get('rate', 5e6)) + self._lo_offset = float(options.get('lo_offset', 0)) + self._chan = int(options.get('chan', 0)) + self._amplitude = float(options.get('ampl', 1/numpy.sqrt(2))) + self._pwr_dbfs = 20 * numpy.log10(self._amplitude) + self._tone_freq = 0 + stream_args = uhd.usrp.StreamArgs('fc32', 'sc16') + stream_args.channels = [self._chan] + self._streamer = self._usrp.get_tx_stream(stream_args) + print("==== Creating USRP tone generator. Power offset:", self.power_offset) + self._tone_gen = ToneGenerator(self._rate, self._tone_freq, self._amplitude) + self._tone_gen.set_streamer(self._streamer) + + def enable(self, enb=True): + """ + Turn the tone generator on or off. + """ + if enb: + print("[SigGen] Starting tone generator.") + self._tone_gen.start() + else: + print("[SigGen] Stopping tone generator.") + self._tone_gen.stop() + time.sleep(0.1) # Give it some time to spin or down + + def set_frequency(self, freq): + """ + Set the center frequency of the generated signal. + """ + print("[SigGen] Channel {}: Tuning signal to {:.3f} MHz." + .format(self._chan, freq/1e6)) + tune_req = uhd.types.TuneRequest(freq, self._lo_offset) + self._usrp.set_tx_freq(tune_req, self._chan) + + def _set_power(self, power_dbm): + """ + Set the output power of the device in dBm. + """ + self._usrp.set_tx_power_reference(power_dbm - self._pwr_dbfs, self._chan) + return self._get_power() + + def _get_power(self): + """ + Return the output power of the measurement device. + """ + return self._usrp.get_tx_power_reference(self._chan) + self._pwr_dbfs + +############################################################################### +# The dispatch function +############################################################################### +def get_meas_device(direction, dev_key, options): + """ + Return the measurement device object + """ + assert direction in ('tx', 'rx') + base_class = SignalGeneratorBase if direction == 'rx' else PowerMeterBase + opt_dict = { + k[0]: k[1] if len(k) > 1 else None for k in [x.split("=", 1) for x in options] + } + members = inspect.getmembers(sys.modules[__name__]) + if 'import' in opt_dict: + try: + print("Loading external module: {}".format(opt_dict.get('import'))) + external_module = importlib.import_module(opt_dict.get('import')) + members += inspect.getmembers(external_module) + except (ModuleNotFoundError, ImportError): + print("WARNING: Could not import module '{}'" + .format(opt_dict.get('import'))) + for _, obj in members: + try: + if issubclass(obj, base_class) and dev_key == getattr(obj, 'key', ''): + return obj(opt_dict) + except TypeError: + continue + raise RuntimeError("No {} measurement device found for key: {}".format( + direction.upper(), dev_key)) diff --git a/host/python/uhd/usrp/cal/tone_gen.py b/host/python/uhd/usrp/cal/tone_gen.py new file mode 100644 index 000000000..b9d986132 --- /dev/null +++ b/host/python/uhd/usrp/cal/tone_gen.py @@ -0,0 +1,61 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Provide a tone generator class for USRPs. +""" + +import threading +import numpy +import uhd + +class ToneGenerator: + """ + Class that can output a tone from a different thread until told to stop + """ + def __init__(self, rate, freq, ampl, streamer=None): + self._streamer = streamer + self._buffer = uhd.dsp.signals.get_continuous_tone(rate, freq, ampl) + self._run = False + self._thread = None + + def set_streamer(self, streamer): + """ + Update streamer object + """ + if self._run: + self.stop() + self._streamer = streamer + + def start(self): + """ + Spawn the thread in the background + """ + if not self._streamer: + raise RuntimeError("No streamer defined!") + self._run = True + self._thread = threading.Thread(target=self._worker) + self._thread.start() + self._thread.setName("cal_tx") + + def stop(self): + """ + Stop the transmitter + """ + self._run = False + self._thread.join() + self._thread = None + + def _worker(self): + """ Here is where the action happens """ + metadata = uhd.types.TXMetadata() + while self._run: + # Give it a long-ish timeout so we don't have to throttle in here + if self._streamer.send(self._buffer, metadata, 1.0) != len(self._buffer): + print("WARNING: Failed to transmit entire buffer in ToneGenerator!") + # Send an EOB packet with a single zero-valued sample to close out TX + metadata.end_of_burst = True + self._streamer.send( + numpy.array([0], dtype=numpy.complex64), metadata, 0.1) diff --git a/host/python/uhd/usrp/cal/usrp_calibrator.py b/host/python/uhd/usrp/cal/usrp_calibrator.py new file mode 100644 index 000000000..a70747201 --- /dev/null +++ b/host/python/uhd/usrp/cal/usrp_calibrator.py @@ -0,0 +1,408 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +UHD Power Cal: USRP Calbration Utility Objects +""" + +import time +import inspect +import sys +import numpy +import uhd + +from . import database +from .tone_gen import ToneGenerator + +NUM_SAMPS_PER_EST = int(1e6) +# Limits for the power estimation algorithm. For good estimates, we want the +# signal to be at -6 dBFS, but not outside of an upper or lower limit. +PWR_EST_LLIM = -20 +PWR_EST_IDEAL_LEVEL = -6 +PWR_EST_ULIM = -3 +SIGPWR_LOCK_MAX_ITER = 4 + +# The default distance between frequencies at which we measure +DEFAULT_FREQ_STEP = 10e6 # Hz +DEFAULT_SAMP_RATE = 5e6 + +def get_streamer(usrp, direction, chan): + """ + Create an appropriate streamer object for this channel + """ + stream_args = uhd.usrp.StreamArgs('fc32', 'sc16') + stream_args.channels = [chan] + return usrp.get_rx_stream(stream_args) if direction == 'rx' \ + else usrp.get_tx_stream(stream_args) + +def get_default_gains(direction, gain_range, gain_step): + """ + Create a equidistant gain range for calibration + """ + assert direction in ('rx', 'tx') + if direction == 'tx': + return numpy.arange(0, gain_range.stop(), gain_step) + return numpy.arange(gain_range.stop(), 0, -gain_step) + +def get_usrp_power(streamer, num_samps=NUM_SAMPS_PER_EST, chan=0): + """ + Return the measured input power in dBFS + + The return value is a list of dBFS power values, one per channel. + """ + recv_buffer = numpy.zeros( + (streamer.get_num_channels(), num_samps), dtype=numpy.complex64) + metadata = uhd.types.RXMetadata() + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done) + stream_cmd.num_samps = num_samps + stream_cmd.stream_now = True + streamer.issue_stream_cmd(stream_cmd) + # Pass in long timeout, so we can rx the entire buffer in one go + samps_recvd = streamer.recv(recv_buffer, metadata, 5.0) + if samps_recvd != num_samps: + raise RuntimeError( + "ERROR! get_usrp_power(): Did not receive the correct number of samples!") + return uhd.dsp.signals.get_power_dbfs(recv_buffer[chan]) + + +def subtract_power(p1_db, p2_db): + """ + Return the power of p1 subtracted from p2, where both are given in logarithmic + units. + """ + return 10 * numpy.log10(10**(p1_db/10) - 10**(p2_db/10)) + +############################################################################### +# Base Class +############################################################################### +class USRPCalibratorBase(object): + """ + Base class for device calibration. Every USRP that can do power calibration + needs to implement this. + """ + # These should be overriden to device-specific values + max_input_power = -20 # -20 happens to be a safe value for all devices + min_detectable_signal = -70 + default_rate = DEFAULT_SAMP_RATE + lo_offset = 0.0 + + def __init__(self, usrp, meas_dev, direction, **kwargs): + self._usrp = usrp + self._meas_dev = meas_dev + # This makes sure our measurement will not destroy the DUT + self._meas_dev.max_output_power = self.max_input_power + self._dir = direction + self._desired_gain_step = kwargs.get('gain_step', 10) + self._id = usrp.get_usrp_rx_info(0).get('mboard_id') + self._mb_serial = usrp.get_usrp_rx_info(0).get('mboard_serial') + # Littler helper to print stuff with a device ID prefix. + self.log = lambda *args, **kwargs: print("[{}]".format(self._id), *args, **kwargs) + # Channel, antenna, and streamer will get updated in update_port() + self._chan = None + self._ant = "" + self._streamer = None + # These dictionaries store the results that get written out as well as + # the noise floor for reference + self.results = {} # This must be of the form results[freq][gain] = power + self._noise = {} + # The tone generator object is only needed for Tx measurements, and is + # initialized conditionaly in init() + self._tone_gen = None + # The gains can be overridden by the device if non-equidistant gains are + # desired. However, gains must be increasing order for Tx measurements, + # and in decreasing order for Rx measurements. + self._gains = get_default_gains( + direction, + getattr(self._usrp, 'get_{}_gain_range'.format(self._dir))(), + self._desired_gain_step) + # You might want to override this, but it's not important. It will + # become the 'name' argument for the power cal factory. + self.cal_name = self._id + " Power Cal" + # The child class can store temperature and ref gain here + self.temp = None + self.ref_gain = None + + def init(self, rate, tone_freq, amplitude): + """ + Initialize device with finalized values. Not that __init__() needs to + finish before the call site knows the rate, so we can' fold this into + __init__(). + """ + if self._dir == 'tx': + self._tone_gen = ToneGenerator(rate, tone_freq, amplitude) + + def update_port(self, chan, antenna): + """ + Notify the device that we've switched channel and/or antenna. + """ + self.log("Switching to channel {}, antenna {}.".format(chan, antenna)) + self._ant = antenna + if chan != self._chan: + # This will be an RX streamer for RX power cal, and a TX streamer + # for TX power cal. + self._streamer = get_streamer(self._usrp, self._dir, chan) + if self._dir == 'tx': + self._tone_gen.set_streamer(self._streamer) + self._chan = chan + + def _get_frequencies(self, start_hint=None, stop_hint=None, step_hint=None): + """ + Return an iterable of frequencies for testing. + + The default will check the hints against the device, but otherwise heed + them. + + If a particular device needs to check specific frequencies, then + override this. + """ + start_min = \ + getattr(self._usrp, 'get_{}_freq_range'.format(self._dir))( + self._chan).start() + start_hint = start_hint or start_min + start = max(start_hint, start_min) + stop_max = \ + getattr(self._usrp, 'get_{}_freq_range'.format(self._dir))( + self._chan).stop() + stop_hint = stop_hint or stop_max + stop = min(stop_hint, stop_max) + step = step_hint or DEFAULT_FREQ_STEP + return numpy.arange(start, stop + step, step) + + def init_frequencies(self, start_hint, stop_hint, step_hint): + """ + Return an iterable of frequencies for testing. + + The default will check the hints against the device, but otherwise heed + them. + + Then it will measure the noise floor across frequency to get a good + baseline measurement. + """ + freqs = self._get_frequencies(start_hint, stop_hint, step_hint) + if self._dir == 'tx': + print("===== Measuring noise floor across frequency...") + for freq in freqs: + self._meas_dev.set_frequency(freq) + self._noise[freq] = self._meas_dev.get_power() + print("[TX] Noise floor: {:2} MHz => {:+6.2f} dBm" + .format(freq/1e6, self._noise[freq])) + else: # Rx + print("===== Measuring noise floor across frequency and gain...") + for freq in freqs: + self._noise[freq] = {} + tune_req = uhd.types.TuneRequest(freq) + self._usrp.set_rx_freq(tune_req, self._chan) + for gain in self._gains: + self._usrp.set_rx_gain(gain, self._chan) + self._noise[freq][gain] = get_usrp_power(self._streamer) + print("[RX] Noise floor: {:2} MHz / {} dB => {:+6.2f} dBFS" + .format(freq/1e6, gain, self._noise[freq][gain])) + return freqs + + def start(self): + """ + Initialize the device for calibration + """ + if self._dir == 'tx': + self._tone_gen.start() + else: + self._meas_dev.enable(True) + + def stop(self, store=True): + """ + Shut down the device after calibration + """ + if self._dir == 'tx': + self._tone_gen.stop() + else: + self._meas_dev.enable(False) + if store: + self.store() + + def run_rx_cal(self, freq): + """ + Run the actual RX calibration for this frequency. + """ + # Go to highest gain, lock in signal generator + self._usrp.set_rx_gain(max(self._gains), self._chan) + time.sleep(0.1) # Settling time of the USRP, highly conservative + self.log("Locking in signal generator power...") + self.log("Requesting input power: {:+.2f} dBm." + .format(self.min_detectable_signal)) + usrp_input_power = self._meas_dev.set_power(self.min_detectable_signal) + recvd_power = get_usrp_power(self._streamer) + self.log("Got input power: {:+.2f} dBm. Received power: {:.2f} dBFS. " + "Requesting new input power: {:+.2f} dBm." + .format(usrp_input_power, + recvd_power, + usrp_input_power + PWR_EST_IDEAL_LEVEL - recvd_power)) + usrp_input_power = self._meas_dev.set_power( + usrp_input_power + PWR_EST_IDEAL_LEVEL - recvd_power) + siggen_locked = False + for _ in range(SIGPWR_LOCK_MAX_ITER): + recvd_power = get_usrp_power(self._streamer) + if PWR_EST_LLIM <= recvd_power <= PWR_EST_ULIM: + siggen_locked = True + break + self.log("Receiving input power: {:+.2f} dBFS.".format(recvd_power)) + power_delta = PWR_EST_IDEAL_LEVEL - recvd_power + # Update power output by the delta from the desired input value: + self.log("Requesting input power: {:+.2f} dBm." + .format(usrp_input_power + power_delta)) + usrp_input_power = self._meas_dev.set_power(usrp_input_power + power_delta) + if not siggen_locked: + raise RuntimeError( + "Unable to lock siggen within {} iterations! Last input power level: {:+6.2f} dBm." + .format(SIGPWR_LOCK_MAX_ITER, usrp_input_power)) + self.log("Locked signal generator in at input power level: {:+6.2f} dBm." + .format(usrp_input_power)) + # Now iterate through gains + results = {} + # Gains are in decreasing order! + last_gain = self._gains[0] + for gain in self._gains: + self._usrp.set_rx_gain(gain, self._chan) # Set the new gain + self.log("Set gain to: {} dB. Got gain: {} dB." + .format(gain, self._usrp.get_rx_gain(self._chan))) + time.sleep(0.1) # Settling time of the USRP, highly conservative + gain_delta = last_gain - gain # This is our gain step + if gain_delta: + # If we decrease the device gain, we need to crank up the input + # power + usrp_input_power = self._meas_dev.set_power( + min(usrp_input_power + gain_delta, self.max_input_power)) + # usrp_input_power = self._meas_dev.set_power(usrp_input_power + gain_delta) + self.log("New input power is: {:+.2f} dBm".format(usrp_input_power)) + recvd_power = get_usrp_power(self._streamer) + self.log("Received power: {:.2f} dBFS".format(recvd_power)) + # It's possible that we lose the lock on the signal power, so allow + # for a correction + if not PWR_EST_LLIM <= recvd_power <= PWR_EST_ULIM: + power_delta = PWR_EST_IDEAL_LEVEL - recvd_power + self.log("Adapting input power to: {:+.2f} dBm." + .format(usrp_input_power + power_delta)) + usrp_input_power = self._meas_dev.set_power(usrp_input_power + power_delta) + self.log("New input power is: {:+.2f} dBm".format(usrp_input_power)) + # And then of course, measure again + recvd_power = get_usrp_power(self._streamer) + self.log("Received power: {:.2f} dBFS".format(recvd_power)) + # Note: The noise power should be way down there, and really + # shouldn't matter. We subtract it anyway for formal correctness. + recvd_signal_power = subtract_power(recvd_power, self._noise[freq][gain]) + # A note on the following equation: 'recvd_signal_power' is in dBFS, + # and usrp_input_power is in dBm. However, this is the reference + # signal, so we need the power (in dBm) that corresponds to 0 dBFS. + # The assumption is that digital gain is linear, so what we really + # want is usrp_input_power - (recvd_signal_power - 0dBFS), and the + # result of the equation is in dBm again. We omit the subtract-by-zero + # since our variables don't have units. + results[gain] = usrp_input_power - recvd_signal_power + self.log("{:2} dB => {:+6.2f} dBm".format(gain, results[gain])) + # If we get too close to the noise floor, we stop + if recvd_power - self._noise[freq][gain] <= 1.5: + self.log("Can no longer detect input signal. Terminating.") + break + self.results[freq] = results + + def run_tx_cal(self, freq): + """ + Run the actual TX calibration for this frequency. + """ + results = {} + for gain in self._gains: + self._usrp.set_tx_gain(gain, self._chan) + time.sleep(0.1) # Settling time of the USRP, highly conservative + results[gain] = self._meas_dev.get_power() + self.log("{:2} dB => {:+6.2f} dBm".format(gain, results[gain])) + self.results[freq] = results + + def store(self): + """ + Return the results object + """ + chan_info = getattr(self._usrp, "get_usrp_{}_info".format(self._dir))(self._chan) + cal_key = chan_info.get("{}_ref_power_key".format(self._dir)) + cal_serial = chan_info.get("{}_ref_power_serial".format(self._dir)) + cal_data = uhd.usrp.cal.PwrCal(self.cal_name, cal_serial, int(time.time())) + if self.temp: + cal_data.set_temperature(self.temp) + if self.ref_gain: + cal_data.set_ref_gain(self.ref_gain) + for freq, results in self.results.items(): + max_power = max(results.values()) + min_power = min(results.values()) + cal_data.add_power_table(results, min_power, max_power, freq) + database.write_cal_data( + cal_key, + cal_serial, + cal_data.serialize()) + self.results = {} + +class B200Calibrator(USRPCalibratorBase): + """ + B200 calibration + """ + mboard_ids = ('B200', 'B210', 'B200mini', 'B205mini') + # Choosing 5 MHz: It is a small rate, but carries enough bandwidth to receive + # a tone. By default, the auto MCR will kick in and set the MCR to 40 Msps, + # thus engaging all halfbands. + default_rate = 5e6 + # Choosing an LO offset of 10 MHz: At 5 Msps, the LO will never be within + # our estimate. B200 generally has good DC offset / IQ balance performance, + # but we still try and avoid DC as much as possible. + lo_offset = 10e6 + + def __init__(self, usrp, meas_dev, direction, **kwargs): + super().__init__(usrp, meas_dev, direction, **kwargs) + print("===== Reading temperature... ", end="") + self.temp = self._usrp.get_rx_sensor("temp").to_int() + print("{} C".format(self.temp)) + # TODO don't hard code + self.ref_gain = 60 + +class X300Calibrator(USRPCalibratorBase): + """ + X300/X310 calibration + + Notes / TODOs: + - The X310 must avoid frequencies that are multiples of 200 MHz; here be + harmonics (TODO) + - TwinRX needs its own special method. It needs to be run once with one + channel in the streamer, and once with two channels in the streamer. Or we + come up with a clever way of enabling the other channel without modifying + the streamer. A poke to the prop tree might do the trick. + """ + mboard_ids = ('X300', 'X310', 'NI-2974') + # Choosing 5 MHz: It is a small rate, but carries enough bandwidth to receive + # a tone. It's 1/40 the master clock rate, which means it'll engage max + # halfbands. + default_rate = 5e6 + # Choosing an LO offset of 10 MHz: At 5 Msps, the LO will never be within + # our estimate, so it doesn't matter if this device is DC offset / IQ balance + # calibrated. + lo_offset = 10e6 + + +############################################################################### +# The dispatch function +############################################################################### +def get_usrp_calibrator(usrp, meas_dev, direction, **kwargs): + """ + Return a USRP calibrator object. + """ + usrp_type = \ + getattr(usrp, 'get_usrp_{}_info'.format(direction))().get('mboard_id') + if usrp_type is None: + raise RuntimeError("Could not determine USRP type!") + print("=== Detected USRP type:", usrp_type) + for _, obj in inspect.getmembers(sys.modules[__name__]): + try: + if issubclass(obj, USRPCalibratorBase) \ + and usrp_type in getattr(obj, 'mboard_ids', ''): + return obj(usrp, meas_dev, direction, **kwargs) + except TypeError: + continue + raise RuntimeError("No USRP calibrator object found for device type: {}" + .format(usrp_type)) diff --git a/host/python/uhd/usrp/cal/visa.py b/host/python/uhd/usrp/cal/visa.py new file mode 100644 index 000000000..16c2aff56 --- /dev/null +++ b/host/python/uhd/usrp/cal/visa.py @@ -0,0 +1,129 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +VISA-type measurement devices +""" + +import re +import sys +import inspect +import importlib + +class VISADevice: + """ + Parent class for VISA/SCPI devices (that can be accessed via PyVISA) + """ + # Must contain a dictionary res_ids. See class below as an example + + def __init__(self, resource): + self.res = resource + # You can query the exact model etc. like this: + # id = self._res.query("*IDN?").strip( + + def init_power_meter(self): + """ + Initialize this device to measure power. + """ + raise NotImplementedError() + + def init_signal_generator(self): + """ + Initialize this device to generate signals. + """ + raise NotImplementedError() + + def set_frequency(self, freq): + """ + Set frequency + """ + raise NotImplementedError() + + def get_power_dbm(self): + """ + Return the received/measured power in dBm (power meter) or return the + output power it's currently set to (signal generator). + """ + raise NotImplementedError() + + +class USBPowerMeter(VISADevice): + """ + USB-based power measurement devices + """ + # The keys of res_ids are regular expressions, which have to match the + # resource ID of the VISA device. The values are a name for the device, which + # is used for informing the user which driver was found. + # + # A class can match any number of resource IDs. If the commands used depend + # on the specific ID, it can be queried in the appropriate init function + # using the *IDN? command which is understood by all VISA devices. + res_ids = { + r'USB\d+::2733::376::\d+::0::INSTR': 'R&S NRP-6A', + } + + def init_power_meter(self): + """ + Enable the sensor to read power + """ + self.res.timeout = 5000 + self.res.write("SENS:AVER:COUN 20") + self.res.write("SENS:AVER:COUN:AUTO ON") + self.res.write('UNIT:POW DBM') + self.res.write('SENS:FUNC "POW:AVG"') + + def init_signal_generator(self): + """ + This class is for power meters, so no bueno + """ + raise RuntimeError("This device cannot be used for signal generation!") + + def set_frequency(self, freq): + """ + Set frequency + """ + self.res.write('SENS:FREQ {}'.format(freq)) + + def get_power_dbm(self): + """ + Return measured power in dBm + """ + self.res.write('INIT:IMM') + return float(self.res.query('FETCH?')) + +############################################################################### +# The dispatch function +############################################################################### +def get_visa_device(resource, key, opt_dict): + """ + Return the VISA device object + """ + def match_res(obj): + """ + Check if a class obj matches the requested key + """ + for pattern, res_id in getattr(obj, 'res_ids', {}).items(): + if re.match(pattern, key): + print("Found VISA device: {}".format(res_id)) + return True + return False + # Import additional modules if requested + members = inspect.getmembers(sys.modules[__name__]) + if 'import' in opt_dict: + try: + print("Loading external module: {}".format(opt_dict.get('import'))) + external_module = importlib.import_module(opt_dict.get('import')) + members += inspect.getmembers(external_module) + except (ModuleNotFoundError, ImportError): + print("WARNING: Could not import module '{}'" + .format(opt_dict.get('import'))) + # Now browse classes and find one that matches + for _, obj in members: + try: + if issubclass(obj, VISADevice) and match_res(obj): + return obj(resource) + except TypeError: + continue + raise RuntimeError("No VISA device class found for key: {}".format(key)) diff --git a/host/utils/CMakeLists.txt b/host/utils/CMakeLists.txt index 08ace6a0c..4bbcd252b 100644 --- a/host/utils/CMakeLists.txt +++ b/host/utils/CMakeLists.txt @@ -50,6 +50,7 @@ set(util_share_sources set(util_share_sources_py converter_benchmark.py convert_cal_data.py + uhd_power_cal.py ) if(ENABLE_USB) list(APPEND util_share_sources diff --git a/host/utils/uhd_power_cal.py b/host/utils/uhd_power_cal.py new file mode 100644 index 000000000..b71e8f3ab --- /dev/null +++ b/host/utils/uhd_power_cal.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Utility to run power calibrations with USRPs +""" + +import sys +import math +import pickle +import argparse +import uhd + +def parse_args(): + """ Parse args and return args object """ + parser = argparse.ArgumentParser( + description="Run power level calibration for supported USRPs", + ) + parser.add_argument( + '--args', default="", + help="USRP Device Args", + ) + parser.add_argument( + '-d', '--dir', default='tx', + help="Direction: Must be either rx or tx. From the perspective of the " + "device: rx == receive power levels, tx == transmit power levels.", + choices=['tx', 'rx']) + parser.add_argument( + '--start', + help='Start Frequency. Defaults to lowest available frequency on device. ' + 'Note that this is only a hint for the device object, which can choose ' + 'to override this value.', type=float) + parser.add_argument( + '--stop', + help='Stop Frequency. Defaults to highest available frequency on device. ' + 'Note that this is only a hint for the device object, which can choose ' + 'to override this value.', type=float) + parser.add_argument( + '--step', + help='Frequency Step. Defaults are device-specific.' + 'Note that this is only a hint for the device object, which can choose ' + 'to override this value. Devices can also measure at non-regular ' + 'frequencies, e.g., to more accurately cover differences between bands.', + type=float) + parser.add_argument( + '--gain-step', type=float, default=1, + help='Gain Step (dB). Defaults to 1 dB.' + 'Note that this is only a hint for the device object, which can choose ' + 'to override this value. Devices can also measure at non-regular ' + 'gain intervals.') + parser.add_argument( + '--lo-offset', type=float, + help='LO Offset. This gets applied to every tune request. Note that for ' + 'TX measurements, there is also an offset applied by --tone-freq. ' + 'The default value is device-dependent.') + parser.add_argument( + '--amplitude', type=float, default=1./math.sqrt(2), + help='Amplitude of the tone that is generated for tx measurements. ' + 'Default is 1/sqrt(2), or -3 dBFS.') + parser.add_argument( + '--attenuation', type=float, default=0.0, + help='Amount of attenuation between measurement device and DUT. This will ' + 'be accounted for by simple addition, it is treated like a measurement error. ' + 'The argument is generally positive, e.g. 30 means 30 dB of attenuation.') + parser.add_argument( + '--tone-freq', type=float, default=1e6, + help='Frequency of the tone that is generated for Tx measurements. This' + 'has the same effect as setting an LO offset, except in software.') + parser.add_argument( + '--antenna', default="*", + help="Select antenna port. A value of '*' means that the calibration " + "will be repeated on all appropriate antenna ports.") + parser.add_argument( + '--channels', default="0", + help="Select channel. A value of '*' means that the calibration " + "will be repeated on all appropriate channels.") + parser.add_argument( + '--meas-dev', default='manual', + help='Type of measurement device that is used') + parser.add_argument( + '-o', '--meas-option', default=[], action='append', + help='Options that are passed to the measurement device') + parser.add_argument( + '-r', '--rate', type=float, + help='Sampling rate at which the calibration is performed') + parser.add_argument( + '--store', metavar='filename.pickle', + help='If provided, will store intermediate cal data. This can be analyzed ' + 'separately, or loaded into the tool with --load.') + parser.add_argument( + '--load', metavar='filename.pickle', + help='If provided, will load intermediate cal data instead of running a ' + 'measurement.') + return parser.parse_args() + + +def sanitize_args(usrp, args, default_rate): + """ + Check the args against the USRP object. + """ + assert usrp.get_num_mboards() == 1, \ + "Power calibration tools are designed for a single motherboard!" + available_chans = getattr(usrp, 'get_{}_num_channels'.format(args.dir))() + if args.channels == '*': + # * means all channels + channels = list(range(available_chans)) + else: + try: + channels = [int(c) for c in args.channels.split(',')] + except ValueError: + raise ValueError("Invalid channel list: {}".format(args.channels)) + for chan in channels: + assert chan in range(available_chans), \ + "ERROR: Invalid channel: {}. Should be in 0..{}.".format( + chan, available_chans) + print("=== Calibrating for channels:", ", ".join([str(x) for x in channels])) + available_ants = getattr(usrp, 'get_{}_antennas'.format(args.dir))() + if args.antenna == '*': + invalid_antennas = ('CAL', 'LOCAL') + antennas = [x for x in available_ants if x not in invalid_antennas] + else: + try: + antennas = args.antenna.split(',') + except ValueError: + raise ValueError("Invalid antenna list: {}".format(args.antenna)) + for ant in antennas: + assert ant in available_ants, \ + "Invalid antenna: {}. Should be in {}.".format( + ant, available_ants) + print("=== Calibrating for antennas:", ", ".join([str(x) for x in antennas])) + rate = args.rate or default_rate + getattr(usrp, 'set_{}_rate'.format(args.dir))(rate) + actual_rate = getattr(usrp, 'get_{}_rate'.format(args.dir))() + print("=== Requested sampling rate: {} Msps, actual rate: {} Msps" + .format(rate/1e6, actual_rate/1e6)) + if args.dir == 'tx' and abs(args.tone_freq) > actual_rate: + raise ValueError( + "The TX tone frequency offset of {} kHz is greater than the sampling rate." + .format(args.tone_freq / 1e3)) + return channels, antennas, actual_rate + + +def init_results(pickle_file): + """ + Initialize results from pickle file, or empty dict + """ + if pickle_file is None: + return {} + with open(pickle_file, 'rb') as results_file: + return pickle.load(results_file) + +class CalRunner: + """ + Executor of the calibration routines. + """ + def __init__(self, usrp, usrp_cal, meas_dev, args): + self.usrp = usrp + self.usrp_cal = usrp_cal + self.meas_dev = meas_dev + self.dir = args.dir + self.tone_offset = args.tone_freq if args.dir == 'tx' else 0.0 + self.lo_offset = args.lo_offset if args.lo_offset else usrp_cal.lo_offset + if self.lo_offset: + print("=== Using USRP LO offset: {:.2f} MHz" + .format(self.lo_offset / 1e6)) + + def run(self, chan, freq): + """ + Run all cal steps for a single frequency + """ + print("=== Running calibration at frequency {:.3f} MHz...".format(freq / 1e6)) + tune_req = uhd.types.TuneRequest(freq, self.lo_offset) + getattr(self.usrp, 'set_{}_freq'.format(self.dir))(tune_req, chan) + actual_freq = getattr(self.usrp, 'get_{}_freq'.format(self.dir))(chan) + if abs(actual_freq - freq) > 1.0: + print("WARNING: Frequency was coerced from {:.2f} MHz to {:.2f} MHz!" + .format(freq / 1e6, actual_freq / 1e6)) + self.meas_dev.set_frequency(actual_freq + self.tone_offset) + getattr(self.usrp_cal, 'run_{}_cal'.format(self.dir))(freq) + +def main(): + """Go, go, go!""" + args = parse_args() + print("=== Detecting USRP...") + usrp = uhd.usrp.MultiUSRP(args.args) + print("=== Measurement direction:", args.dir) + print("=== Initializing measurement device...") + meas_dev = uhd.usrp.cal.get_meas_device(args.dir, args.meas_dev, args.meas_option) + meas_dev.power_offset = args.attenuation + # If we're transmitting, then we need to factor in the "attenuation" from us + # not transmitting at full scale + if args.dir == 'tx': + meas_dev.power_offset -= 20 * math.log10(args.amplitude) + print("=== Initializing USRP calibration object...") + usrp_cal = uhd.usrp.cal.get_usrp_calibrator( + usrp, meas_dev, args.dir, + gain_step=args.gain_step, + ) + channels, antennas, rate = sanitize_args(usrp, args, usrp_cal.default_rate) + results = init_results(args.load) + usrp_cal.init( + rate=rate, + tone_freq=args.tone_freq, + amplitude=args.amplitude, + ) + print("=== Launching calibration...") + cal_runner = CalRunner(usrp, usrp_cal, meas_dev, args) + for chan in channels: + if chan not in results: + results[chan] = {} + for ant in antennas: + if ant in results[chan]: + print("=== Using pickled data for channel {}, antenna {}." + .format(chan, ant)) + continue + print("=== Running calibration for channel {}, antenna {}." + .format(chan, ant)) + # Set up all the objects + getattr(usrp, 'set_{}_antenna'.format(args.dir))(ant, chan) + meas_dev.update_port(chan, ant) + usrp_cal.update_port(chan, ant) + freqs = usrp_cal.init_frequencies(args.start, args.stop, args.step) + usrp_cal.start() # This will activate siggen + # Now calibrate + for freq in freqs: + try: + cal_runner.run(chan, freq) + except RuntimeError as ex: + print("ERROR: Stopping calibration due to exception: {}" + .format(str(ex))) + usrp_cal.stop() + return 1 + # Store results for pickling and shut down for next antenna port + results[chan][ant] = usrp_cal.results + usrp_cal.stop() # This will deactivate siggen and store the data + if args.store: + print("=== Storing pickled calibration data to {}...".format(args.store)) + with open(args.store, 'wb') as results_file: + pickle.dump(results, results_file) + return 0 + +if __name__ == "__main__": + try: + sys.exit(main()) + except (RuntimeError, ValueError) as ex: + print("ERROR:", str(ex)) + sys.exit(1) |