aboutsummaryrefslogtreecommitdiffstats
path: root/host/python
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2020-04-15 15:10:37 -0700
committerAaron Rossetto <aaron.rossetto@ni.com>2020-05-28 15:05:19 -0500
commit65fbf053c059c888fe544327f841313641561255 (patch)
tree396d0b156f533d0c4d3a64288398af48102b247b /host/python
parent56f04e4283cf53803a5994e57364cce89232e545 (diff)
downloaduhd-65fbf053c059c888fe544327f841313641561255.tar.gz
uhd-65fbf053c059c888fe544327f841313641561255.tar.bz2
uhd-65fbf053c059c888fe544327f841313641561255.zip
utils/python: Add uhd_power_cal script
This is a tool for running power calibration.
Diffstat (limited to 'host/python')
-rw-r--r--host/python/uhd/__init__.py1
-rw-r--r--host/python/uhd/usrp/cal/__init__.py3
-rw-r--r--host/python/uhd/usrp/cal/meas_device.py353
-rw-r--r--host/python/uhd/usrp/cal/tone_gen.py61
-rw-r--r--host/python/uhd/usrp/cal/usrp_calibrator.py408
-rw-r--r--host/python/uhd/usrp/cal/visa.py129
6 files changed, 955 insertions, 0 deletions
diff --git a/host/python/uhd/__init__.py b/host/python/uhd/__init__.py
index e12a0626a..8b6a2b36c 100644
--- a/host/python/uhd/__init__.py
+++ b/host/python/uhd/__init__.py
@@ -11,4 +11,5 @@ from . import types
from . import usrp
from . import filters
from . import rfnoc
+from . import dsp
from .libpyuhd.paths import *
diff --git a/host/python/uhd/usrp/cal/__init__.py b/host/python/uhd/usrp/cal/__init__.py
index 77cc3ca35..53de91114 100644
--- a/host/python/uhd/usrp/cal/__init__.py
+++ b/host/python/uhd/usrp/cal/__init__.py
@@ -14,3 +14,6 @@ Python UHD Module: Calibration sub-module
# pylint: disable=wildcard-import
from .libtypes import *
# pylint: enable=wildcard-import
+
+from .meas_device import get_meas_device
+from .usrp_calibrator import get_usrp_calibrator
diff --git a/host/python/uhd/usrp/cal/meas_device.py b/host/python/uhd/usrp/cal/meas_device.py
new file mode 100644
index 000000000..22fe78c11
--- /dev/null
+++ b/host/python/uhd/usrp/cal/meas_device.py
@@ -0,0 +1,353 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+Measurement Device Class for UHD Power Calibration
+"""
+
+import sys
+import time
+import inspect
+import importlib
+import numpy
+import uhd
+from .tone_gen import ToneGenerator
+from .visa import get_visa_device
+
+###############################################################################
+# Base Classes
+###############################################################################
+class PowerMeterBase:
+ """
+ Base class for measuring output power (Tx) of the USRP. That means the
+ measurement device is receiving and the USRP (the DUT) is transmitting.
+ """
+ def __init__(self, options):
+ self._options = options
+ self.power_offset = 0
+
+ def set_frequency(self, freq):
+ """
+ Set the frequency of the measurement device.
+ """
+ raise NotImplementedError()
+
+ def get_power(self):
+ """
+ Return the current measured power in dBm.
+ """
+ return self._get_power() + self.power_offset
+
+ def _get_power(self):
+ """
+ Return the current measured power in dBm.
+ """
+ raise NotImplementedError()
+
+ # pylint: disable=no-self-use
+ def update_port(self, chan, antenna):
+ """
+ Tell the device we're measuring chan + antenna next
+ """
+ input("[TX] Connect your power meter to device channel {}, "
+ "antenna {}. Then, hit Enter.".format(chan, antenna))
+ # pylint: enable=no-self-use
+
+class SignalGeneratorBase:
+ """
+ Base class for measuring input power (Rx) of the USRP. That means the
+ measurement device is transmitting and the USRP (the DUT) is receiving.
+ """
+ def __init__(self, options):
+ self._options = options
+ self.power_offset = 0
+ # Make sure to set this before doing RX cal
+ self.max_output_power = None
+
+ def enable(self, enb=True):
+ """
+ Turn on the power generator. By default, it should be off, and only
+ produce a signal when this was called with an argument value of 'True'.
+ """
+ raise NotImplementedError()
+
+ def set_power(self, power_dbm):
+ """
+ Set the input power of the DUT. This will factor in the power offset,
+ and set the measurement device to produce the power that will cause the
+ DUT to receive power_dbm.
+
+ This will coerce to the next possible power available and return the
+ coerced value.
+ """
+ assert self.max_output_power
+ if power_dbm > self.max_output_power:
+ print("[SigGen] WARNING! Trying to set power beyond safe levels. "
+ "Capping output power at {} dBm.".format(self.max_output_power))
+ power_dbm = self.max_output_power
+ return self._set_power(power_dbm + self.power_offset) - self.power_offset
+
+ def get_power(self):
+ """
+ Return the input power of the DUT. This will factor in the power offset,
+ and will return the power level in dBm that is going into the DUT.
+ Use this with set_power(), as not all power levels can be reached.
+ """
+ return self._get_power() - self.power_offset
+
+ def set_frequency(self, freq):
+ """
+ Set the center frequency of the generated signal.
+ """
+ raise NotImplementedError()
+
+ def _set_power(self, power_dbm):
+ """
+ Set the output power of the device in dBm.
+ """
+ raise NotImplementedError()
+
+ def _get_power(self):
+ """
+ Return the output power of the measurement device.
+ """
+ raise NotImplementedError()
+
+ # pylint: disable=no-self-use
+ def update_port(self, chan, antenna):
+ """
+ Tell the device we're measuring chan + antenna next
+ """
+ input("[RX] Connect your signal generator to device channel {}, "
+ "antenna {}. Then, hit Enter.".format(chan, antenna))
+ # pylint: enable=no-self-use
+
+
+###############################################################################
+# Manual Measurement: For masochists, or for small sample sets
+###############################################################################
+class ManualPowerMeter(PowerMeterBase):
+ """
+ Manual measurement: The script does nothing, it just asks the user to
+ manually make changes and return values
+ """
+ key = 'manual'
+
+ def set_frequency(self, freq):
+ """
+ Ask user to set frequency
+ """
+ input("[TX] Set your power meter to following frequency: "
+ "{:.3f} MHz, then hit Enter.".format(freq/1e6))
+
+ def _get_power(self):
+ """
+ Ask user for the power
+ """
+ num_tries = 5
+ for _ in range(num_tries):
+ try:
+ return float(input("[TX] Please enter the measured power in dBm: ")) \
+ + self.power_offset
+ except ValueError:
+ continue
+ raise ValueError("Invalid power value entered.")
+
+class ManualPowerGenerator(SignalGeneratorBase):
+ """
+ Manual measurement: The script does nothing, it just asks the user to
+ manually make changes and return values
+ """
+ key = 'manual'
+ num_tries = 5
+
+ def enable(self, enable=True):
+ """
+ Ask the user to turn the device on or off
+ """
+ input("[RX] Please {} your signal generator and hit Enter."
+ .format("enable" if enable else "disable"))
+
+ def _set_power(self, power_dbm):
+ """
+ Ask for a power, or the closest, and return that
+ """
+ new_power = input(
+ "[RX] Set your signal generator to following output power: "
+ "{:.1f} dBm, then hit Enter, or enter the closest available power: "
+ .format(power_dbm))
+ if not new_power:
+ return power_dbm
+ for _ in range(self.num_tries):
+ try:
+ return float(new_power)
+ except ValueError:
+ new_power = input(
+ "[RX] Set your signal generator to following output power: "
+ "{:.1f} dBm, then hit Enter, or enter the closest available power: "
+ .format(power_dbm))
+ if not new_power:
+ return power_dbm
+ raise ValueError("Invalid power value entered.")
+
+ def _get_power(self):
+ """
+ Ask user for current power
+ """
+ for _ in range(self.num_tries):
+ try:
+ return float(input(
+ "[RX] Please enter the output power in dBm of your "
+ "signal generator: "))
+ except ValueError:
+ continue
+ raise ValueError("Invalid power value entered.")
+
+ # pylint: disable=no-self-use
+ def set_frequency(self, freq):
+ """
+ Set the center frequency of the generated signal.
+ """
+ input("[RX] Set your signal generator to following frequency: {:.3f} MHz, then hit Enter."
+ .format(freq/1e6))
+ # pylint: enable=no-self-use
+
+###############################################################################
+# VISA: Run through a VISA device, using SCPI commands
+###############################################################################
+class VisaPowerMeter(PowerMeterBase):
+ """
+ VISA based Tx measurement device
+ """
+ DEFAULT_VISA_LIB = '@py' # pyvisa-py
+ DEFAULT_VISA_QUERY = "?*::INSTR"
+
+ key = 'visa'
+
+ def __init__(self, options):
+ super().__init__(options)
+ # pylint: disable=import-outside-toplevel
+ # We disable this warning because having pyvisa installed is not a
+ # requirement, so we want to load it as late as possible, and only when
+ # needed.
+ import pyvisa
+ # pylint: enable=import-outside-toplevel
+ visa_lib = options.get('visa_lib', self.DEFAULT_VISA_LIB)
+ visa_query = options.get('visa_query', self.DEFAULT_VISA_QUERY)
+ self._rm = pyvisa.ResourceManager(visa_lib)
+ resources = self._rm.list_resources(visa_query)
+ if len(resources) > 1:
+ print("Found VISA devices:")
+ for resource in resources:
+ print("*" + resource)
+ raise RuntimeError(
+ "Found more than one measurement device. Please limit the query!")
+ if len(resources) == 0:
+ raise RuntimeError("No measurement device found!")
+ self._res = self._rm.open_resource(resources[0])
+ self.visa = get_visa_device(self._res, resources[0], options)
+ self.visa.init_power_meter()
+
+ def set_frequency(self, freq):
+ """
+ Set frequency
+ """
+ self.visa.set_frequency(freq)
+
+ def _get_power(self):
+ """
+ Get power
+ """
+ return self.visa.get_power_dbm()
+
+###############################################################################
+# USRP: Use a pre-calibrated USRP as a measurement device
+###############################################################################
+class USRPPowerGenerator(SignalGeneratorBase):
+ """
+ The power generator is actually a USRP. This only works if the USRP that is
+ used for power/signal generation has been previously calbrated itself.
+ """
+ key = 'usrp'
+
+ def __init__(self, options):
+ super().__init__(options)
+ self._usrp = uhd.usrp.MultiUSRP(options.get('args'))
+ self._rate = float(options.get('rate', 5e6))
+ self._lo_offset = float(options.get('lo_offset', 0))
+ self._chan = int(options.get('chan', 0))
+ self._amplitude = float(options.get('ampl', 1/numpy.sqrt(2)))
+ self._pwr_dbfs = 20 * numpy.log10(self._amplitude)
+ self._tone_freq = 0
+ stream_args = uhd.usrp.StreamArgs('fc32', 'sc16')
+ stream_args.channels = [self._chan]
+ self._streamer = self._usrp.get_tx_stream(stream_args)
+ print("==== Creating USRP tone generator. Power offset:", self.power_offset)
+ self._tone_gen = ToneGenerator(self._rate, self._tone_freq, self._amplitude)
+ self._tone_gen.set_streamer(self._streamer)
+
+ def enable(self, enb=True):
+ """
+ Turn the tone generator on or off.
+ """
+ if enb:
+ print("[SigGen] Starting tone generator.")
+ self._tone_gen.start()
+ else:
+ print("[SigGen] Stopping tone generator.")
+ self._tone_gen.stop()
+ time.sleep(0.1) # Give it some time to spin or down
+
+ def set_frequency(self, freq):
+ """
+ Set the center frequency of the generated signal.
+ """
+ print("[SigGen] Channel {}: Tuning signal to {:.3f} MHz."
+ .format(self._chan, freq/1e6))
+ tune_req = uhd.types.TuneRequest(freq, self._lo_offset)
+ self._usrp.set_tx_freq(tune_req, self._chan)
+
+ def _set_power(self, power_dbm):
+ """
+ Set the output power of the device in dBm.
+ """
+ self._usrp.set_tx_power_reference(power_dbm - self._pwr_dbfs, self._chan)
+ return self._get_power()
+
+ def _get_power(self):
+ """
+ Return the output power of the measurement device.
+ """
+ return self._usrp.get_tx_power_reference(self._chan) + self._pwr_dbfs
+
+###############################################################################
+# The dispatch function
+###############################################################################
+def get_meas_device(direction, dev_key, options):
+ """
+ Return the measurement device object
+ """
+ assert direction in ('tx', 'rx')
+ base_class = SignalGeneratorBase if direction == 'rx' else PowerMeterBase
+ opt_dict = {
+ k[0]: k[1] if len(k) > 1 else None for k in [x.split("=", 1) for x in options]
+ }
+ members = inspect.getmembers(sys.modules[__name__])
+ if 'import' in opt_dict:
+ try:
+ print("Loading external module: {}".format(opt_dict.get('import')))
+ external_module = importlib.import_module(opt_dict.get('import'))
+ members += inspect.getmembers(external_module)
+ except (ModuleNotFoundError, ImportError):
+ print("WARNING: Could not import module '{}'"
+ .format(opt_dict.get('import')))
+ for _, obj in members:
+ try:
+ if issubclass(obj, base_class) and dev_key == getattr(obj, 'key', ''):
+ return obj(opt_dict)
+ except TypeError:
+ continue
+ raise RuntimeError("No {} measurement device found for key: {}".format(
+ direction.upper(), dev_key))
diff --git a/host/python/uhd/usrp/cal/tone_gen.py b/host/python/uhd/usrp/cal/tone_gen.py
new file mode 100644
index 000000000..b9d986132
--- /dev/null
+++ b/host/python/uhd/usrp/cal/tone_gen.py
@@ -0,0 +1,61 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+Provide a tone generator class for USRPs.
+"""
+
+import threading
+import numpy
+import uhd
+
+class ToneGenerator:
+ """
+ Class that can output a tone from a different thread until told to stop
+ """
+ def __init__(self, rate, freq, ampl, streamer=None):
+ self._streamer = streamer
+ self._buffer = uhd.dsp.signals.get_continuous_tone(rate, freq, ampl)
+ self._run = False
+ self._thread = None
+
+ def set_streamer(self, streamer):
+ """
+ Update streamer object
+ """
+ if self._run:
+ self.stop()
+ self._streamer = streamer
+
+ def start(self):
+ """
+ Spawn the thread in the background
+ """
+ if not self._streamer:
+ raise RuntimeError("No streamer defined!")
+ self._run = True
+ self._thread = threading.Thread(target=self._worker)
+ self._thread.start()
+ self._thread.setName("cal_tx")
+
+ def stop(self):
+ """
+ Stop the transmitter
+ """
+ self._run = False
+ self._thread.join()
+ self._thread = None
+
+ def _worker(self):
+ """ Here is where the action happens """
+ metadata = uhd.types.TXMetadata()
+ while self._run:
+ # Give it a long-ish timeout so we don't have to throttle in here
+ if self._streamer.send(self._buffer, metadata, 1.0) != len(self._buffer):
+ print("WARNING: Failed to transmit entire buffer in ToneGenerator!")
+ # Send an EOB packet with a single zero-valued sample to close out TX
+ metadata.end_of_burst = True
+ self._streamer.send(
+ numpy.array([0], dtype=numpy.complex64), metadata, 0.1)
diff --git a/host/python/uhd/usrp/cal/usrp_calibrator.py b/host/python/uhd/usrp/cal/usrp_calibrator.py
new file mode 100644
index 000000000..a70747201
--- /dev/null
+++ b/host/python/uhd/usrp/cal/usrp_calibrator.py
@@ -0,0 +1,408 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+UHD Power Cal: USRP Calbration Utility Objects
+"""
+
+import time
+import inspect
+import sys
+import numpy
+import uhd
+
+from . import database
+from .tone_gen import ToneGenerator
+
+NUM_SAMPS_PER_EST = int(1e6)
+# Limits for the power estimation algorithm. For good estimates, we want the
+# signal to be at -6 dBFS, but not outside of an upper or lower limit.
+PWR_EST_LLIM = -20
+PWR_EST_IDEAL_LEVEL = -6
+PWR_EST_ULIM = -3
+SIGPWR_LOCK_MAX_ITER = 4
+
+# The default distance between frequencies at which we measure
+DEFAULT_FREQ_STEP = 10e6 # Hz
+DEFAULT_SAMP_RATE = 5e6
+
+def get_streamer(usrp, direction, chan):
+ """
+ Create an appropriate streamer object for this channel
+ """
+ stream_args = uhd.usrp.StreamArgs('fc32', 'sc16')
+ stream_args.channels = [chan]
+ return usrp.get_rx_stream(stream_args) if direction == 'rx' \
+ else usrp.get_tx_stream(stream_args)
+
+def get_default_gains(direction, gain_range, gain_step):
+ """
+ Create a equidistant gain range for calibration
+ """
+ assert direction in ('rx', 'tx')
+ if direction == 'tx':
+ return numpy.arange(0, gain_range.stop(), gain_step)
+ return numpy.arange(gain_range.stop(), 0, -gain_step)
+
+def get_usrp_power(streamer, num_samps=NUM_SAMPS_PER_EST, chan=0):
+ """
+ Return the measured input power in dBFS
+
+ The return value is a list of dBFS power values, one per channel.
+ """
+ recv_buffer = numpy.zeros(
+ (streamer.get_num_channels(), num_samps), dtype=numpy.complex64)
+ metadata = uhd.types.RXMetadata()
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done)
+ stream_cmd.num_samps = num_samps
+ stream_cmd.stream_now = True
+ streamer.issue_stream_cmd(stream_cmd)
+ # Pass in long timeout, so we can rx the entire buffer in one go
+ samps_recvd = streamer.recv(recv_buffer, metadata, 5.0)
+ if samps_recvd != num_samps:
+ raise RuntimeError(
+ "ERROR! get_usrp_power(): Did not receive the correct number of samples!")
+ return uhd.dsp.signals.get_power_dbfs(recv_buffer[chan])
+
+
+def subtract_power(p1_db, p2_db):
+ """
+ Return the power of p1 subtracted from p2, where both are given in logarithmic
+ units.
+ """
+ return 10 * numpy.log10(10**(p1_db/10) - 10**(p2_db/10))
+
+###############################################################################
+# Base Class
+###############################################################################
+class USRPCalibratorBase(object):
+ """
+ Base class for device calibration. Every USRP that can do power calibration
+ needs to implement this.
+ """
+ # These should be overriden to device-specific values
+ max_input_power = -20 # -20 happens to be a safe value for all devices
+ min_detectable_signal = -70
+ default_rate = DEFAULT_SAMP_RATE
+ lo_offset = 0.0
+
+ def __init__(self, usrp, meas_dev, direction, **kwargs):
+ self._usrp = usrp
+ self._meas_dev = meas_dev
+ # This makes sure our measurement will not destroy the DUT
+ self._meas_dev.max_output_power = self.max_input_power
+ self._dir = direction
+ self._desired_gain_step = kwargs.get('gain_step', 10)
+ self._id = usrp.get_usrp_rx_info(0).get('mboard_id')
+ self._mb_serial = usrp.get_usrp_rx_info(0).get('mboard_serial')
+ # Littler helper to print stuff with a device ID prefix.
+ self.log = lambda *args, **kwargs: print("[{}]".format(self._id), *args, **kwargs)
+ # Channel, antenna, and streamer will get updated in update_port()
+ self._chan = None
+ self._ant = ""
+ self._streamer = None
+ # These dictionaries store the results that get written out as well as
+ # the noise floor for reference
+ self.results = {} # This must be of the form results[freq][gain] = power
+ self._noise = {}
+ # The tone generator object is only needed for Tx measurements, and is
+ # initialized conditionaly in init()
+ self._tone_gen = None
+ # The gains can be overridden by the device if non-equidistant gains are
+ # desired. However, gains must be increasing order for Tx measurements,
+ # and in decreasing order for Rx measurements.
+ self._gains = get_default_gains(
+ direction,
+ getattr(self._usrp, 'get_{}_gain_range'.format(self._dir))(),
+ self._desired_gain_step)
+ # You might want to override this, but it's not important. It will
+ # become the 'name' argument for the power cal factory.
+ self.cal_name = self._id + " Power Cal"
+ # The child class can store temperature and ref gain here
+ self.temp = None
+ self.ref_gain = None
+
+ def init(self, rate, tone_freq, amplitude):
+ """
+ Initialize device with finalized values. Not that __init__() needs to
+ finish before the call site knows the rate, so we can' fold this into
+ __init__().
+ """
+ if self._dir == 'tx':
+ self._tone_gen = ToneGenerator(rate, tone_freq, amplitude)
+
+ def update_port(self, chan, antenna):
+ """
+ Notify the device that we've switched channel and/or antenna.
+ """
+ self.log("Switching to channel {}, antenna {}.".format(chan, antenna))
+ self._ant = antenna
+ if chan != self._chan:
+ # This will be an RX streamer for RX power cal, and a TX streamer
+ # for TX power cal.
+ self._streamer = get_streamer(self._usrp, self._dir, chan)
+ if self._dir == 'tx':
+ self._tone_gen.set_streamer(self._streamer)
+ self._chan = chan
+
+ def _get_frequencies(self, start_hint=None, stop_hint=None, step_hint=None):
+ """
+ Return an iterable of frequencies for testing.
+
+ The default will check the hints against the device, but otherwise heed
+ them.
+
+ If a particular device needs to check specific frequencies, then
+ override this.
+ """
+ start_min = \
+ getattr(self._usrp, 'get_{}_freq_range'.format(self._dir))(
+ self._chan).start()
+ start_hint = start_hint or start_min
+ start = max(start_hint, start_min)
+ stop_max = \
+ getattr(self._usrp, 'get_{}_freq_range'.format(self._dir))(
+ self._chan).stop()
+ stop_hint = stop_hint or stop_max
+ stop = min(stop_hint, stop_max)
+ step = step_hint or DEFAULT_FREQ_STEP
+ return numpy.arange(start, stop + step, step)
+
+ def init_frequencies(self, start_hint, stop_hint, step_hint):
+ """
+ Return an iterable of frequencies for testing.
+
+ The default will check the hints against the device, but otherwise heed
+ them.
+
+ Then it will measure the noise floor across frequency to get a good
+ baseline measurement.
+ """
+ freqs = self._get_frequencies(start_hint, stop_hint, step_hint)
+ if self._dir == 'tx':
+ print("===== Measuring noise floor across frequency...")
+ for freq in freqs:
+ self._meas_dev.set_frequency(freq)
+ self._noise[freq] = self._meas_dev.get_power()
+ print("[TX] Noise floor: {:2} MHz => {:+6.2f} dBm"
+ .format(freq/1e6, self._noise[freq]))
+ else: # Rx
+ print("===== Measuring noise floor across frequency and gain...")
+ for freq in freqs:
+ self._noise[freq] = {}
+ tune_req = uhd.types.TuneRequest(freq)
+ self._usrp.set_rx_freq(tune_req, self._chan)
+ for gain in self._gains:
+ self._usrp.set_rx_gain(gain, self._chan)
+ self._noise[freq][gain] = get_usrp_power(self._streamer)
+ print("[RX] Noise floor: {:2} MHz / {} dB => {:+6.2f} dBFS"
+ .format(freq/1e6, gain, self._noise[freq][gain]))
+ return freqs
+
+ def start(self):
+ """
+ Initialize the device for calibration
+ """
+ if self._dir == 'tx':
+ self._tone_gen.start()
+ else:
+ self._meas_dev.enable(True)
+
+ def stop(self, store=True):
+ """
+ Shut down the device after calibration
+ """
+ if self._dir == 'tx':
+ self._tone_gen.stop()
+ else:
+ self._meas_dev.enable(False)
+ if store:
+ self.store()
+
+ def run_rx_cal(self, freq):
+ """
+ Run the actual RX calibration for this frequency.
+ """
+ # Go to highest gain, lock in signal generator
+ self._usrp.set_rx_gain(max(self._gains), self._chan)
+ time.sleep(0.1) # Settling time of the USRP, highly conservative
+ self.log("Locking in signal generator power...")
+ self.log("Requesting input power: {:+.2f} dBm."
+ .format(self.min_detectable_signal))
+ usrp_input_power = self._meas_dev.set_power(self.min_detectable_signal)
+ recvd_power = get_usrp_power(self._streamer)
+ self.log("Got input power: {:+.2f} dBm. Received power: {:.2f} dBFS. "
+ "Requesting new input power: {:+.2f} dBm."
+ .format(usrp_input_power,
+ recvd_power,
+ usrp_input_power + PWR_EST_IDEAL_LEVEL - recvd_power))
+ usrp_input_power = self._meas_dev.set_power(
+ usrp_input_power + PWR_EST_IDEAL_LEVEL - recvd_power)
+ siggen_locked = False
+ for _ in range(SIGPWR_LOCK_MAX_ITER):
+ recvd_power = get_usrp_power(self._streamer)
+ if PWR_EST_LLIM <= recvd_power <= PWR_EST_ULIM:
+ siggen_locked = True
+ break
+ self.log("Receiving input power: {:+.2f} dBFS.".format(recvd_power))
+ power_delta = PWR_EST_IDEAL_LEVEL - recvd_power
+ # Update power output by the delta from the desired input value:
+ self.log("Requesting input power: {:+.2f} dBm."
+ .format(usrp_input_power + power_delta))
+ usrp_input_power = self._meas_dev.set_power(usrp_input_power + power_delta)
+ if not siggen_locked:
+ raise RuntimeError(
+ "Unable to lock siggen within {} iterations! Last input power level: {:+6.2f} dBm."
+ .format(SIGPWR_LOCK_MAX_ITER, usrp_input_power))
+ self.log("Locked signal generator in at input power level: {:+6.2f} dBm."
+ .format(usrp_input_power))
+ # Now iterate through gains
+ results = {}
+ # Gains are in decreasing order!
+ last_gain = self._gains[0]
+ for gain in self._gains:
+ self._usrp.set_rx_gain(gain, self._chan) # Set the new gain
+ self.log("Set gain to: {} dB. Got gain: {} dB."
+ .format(gain, self._usrp.get_rx_gain(self._chan)))
+ time.sleep(0.1) # Settling time of the USRP, highly conservative
+ gain_delta = last_gain - gain # This is our gain step
+ if gain_delta:
+ # If we decrease the device gain, we need to crank up the input
+ # power
+ usrp_input_power = self._meas_dev.set_power(
+ min(usrp_input_power + gain_delta, self.max_input_power))
+ # usrp_input_power = self._meas_dev.set_power(usrp_input_power + gain_delta)
+ self.log("New input power is: {:+.2f} dBm".format(usrp_input_power))
+ recvd_power = get_usrp_power(self._streamer)
+ self.log("Received power: {:.2f} dBFS".format(recvd_power))
+ # It's possible that we lose the lock on the signal power, so allow
+ # for a correction
+ if not PWR_EST_LLIM <= recvd_power <= PWR_EST_ULIM:
+ power_delta = PWR_EST_IDEAL_LEVEL - recvd_power
+ self.log("Adapting input power to: {:+.2f} dBm."
+ .format(usrp_input_power + power_delta))
+ usrp_input_power = self._meas_dev.set_power(usrp_input_power + power_delta)
+ self.log("New input power is: {:+.2f} dBm".format(usrp_input_power))
+ # And then of course, measure again
+ recvd_power = get_usrp_power(self._streamer)
+ self.log("Received power: {:.2f} dBFS".format(recvd_power))
+ # Note: The noise power should be way down there, and really
+ # shouldn't matter. We subtract it anyway for formal correctness.
+ recvd_signal_power = subtract_power(recvd_power, self._noise[freq][gain])
+ # A note on the following equation: 'recvd_signal_power' is in dBFS,
+ # and usrp_input_power is in dBm. However, this is the reference
+ # signal, so we need the power (in dBm) that corresponds to 0 dBFS.
+ # The assumption is that digital gain is linear, so what we really
+ # want is usrp_input_power - (recvd_signal_power - 0dBFS), and the
+ # result of the equation is in dBm again. We omit the subtract-by-zero
+ # since our variables don't have units.
+ results[gain] = usrp_input_power - recvd_signal_power
+ self.log("{:2} dB => {:+6.2f} dBm".format(gain, results[gain]))
+ # If we get too close to the noise floor, we stop
+ if recvd_power - self._noise[freq][gain] <= 1.5:
+ self.log("Can no longer detect input signal. Terminating.")
+ break
+ self.results[freq] = results
+
+ def run_tx_cal(self, freq):
+ """
+ Run the actual TX calibration for this frequency.
+ """
+ results = {}
+ for gain in self._gains:
+ self._usrp.set_tx_gain(gain, self._chan)
+ time.sleep(0.1) # Settling time of the USRP, highly conservative
+ results[gain] = self._meas_dev.get_power()
+ self.log("{:2} dB => {:+6.2f} dBm".format(gain, results[gain]))
+ self.results[freq] = results
+
+ def store(self):
+ """
+ Return the results object
+ """
+ chan_info = getattr(self._usrp, "get_usrp_{}_info".format(self._dir))(self._chan)
+ cal_key = chan_info.get("{}_ref_power_key".format(self._dir))
+ cal_serial = chan_info.get("{}_ref_power_serial".format(self._dir))
+ cal_data = uhd.usrp.cal.PwrCal(self.cal_name, cal_serial, int(time.time()))
+ if self.temp:
+ cal_data.set_temperature(self.temp)
+ if self.ref_gain:
+ cal_data.set_ref_gain(self.ref_gain)
+ for freq, results in self.results.items():
+ max_power = max(results.values())
+ min_power = min(results.values())
+ cal_data.add_power_table(results, min_power, max_power, freq)
+ database.write_cal_data(
+ cal_key,
+ cal_serial,
+ cal_data.serialize())
+ self.results = {}
+
+class B200Calibrator(USRPCalibratorBase):
+ """
+ B200 calibration
+ """
+ mboard_ids = ('B200', 'B210', 'B200mini', 'B205mini')
+ # Choosing 5 MHz: It is a small rate, but carries enough bandwidth to receive
+ # a tone. By default, the auto MCR will kick in and set the MCR to 40 Msps,
+ # thus engaging all halfbands.
+ default_rate = 5e6
+ # Choosing an LO offset of 10 MHz: At 5 Msps, the LO will never be within
+ # our estimate. B200 generally has good DC offset / IQ balance performance,
+ # but we still try and avoid DC as much as possible.
+ lo_offset = 10e6
+
+ def __init__(self, usrp, meas_dev, direction, **kwargs):
+ super().__init__(usrp, meas_dev, direction, **kwargs)
+ print("===== Reading temperature... ", end="")
+ self.temp = self._usrp.get_rx_sensor("temp").to_int()
+ print("{} C".format(self.temp))
+ # TODO don't hard code
+ self.ref_gain = 60
+
+class X300Calibrator(USRPCalibratorBase):
+ """
+ X300/X310 calibration
+
+ Notes / TODOs:
+ - The X310 must avoid frequencies that are multiples of 200 MHz; here be
+ harmonics (TODO)
+ - TwinRX needs its own special method. It needs to be run once with one
+ channel in the streamer, and once with two channels in the streamer. Or we
+ come up with a clever way of enabling the other channel without modifying
+ the streamer. A poke to the prop tree might do the trick.
+ """
+ mboard_ids = ('X300', 'X310', 'NI-2974')
+ # Choosing 5 MHz: It is a small rate, but carries enough bandwidth to receive
+ # a tone. It's 1/40 the master clock rate, which means it'll engage max
+ # halfbands.
+ default_rate = 5e6
+ # Choosing an LO offset of 10 MHz: At 5 Msps, the LO will never be within
+ # our estimate, so it doesn't matter if this device is DC offset / IQ balance
+ # calibrated.
+ lo_offset = 10e6
+
+
+###############################################################################
+# The dispatch function
+###############################################################################
+def get_usrp_calibrator(usrp, meas_dev, direction, **kwargs):
+ """
+ Return a USRP calibrator object.
+ """
+ usrp_type = \
+ getattr(usrp, 'get_usrp_{}_info'.format(direction))().get('mboard_id')
+ if usrp_type is None:
+ raise RuntimeError("Could not determine USRP type!")
+ print("=== Detected USRP type:", usrp_type)
+ for _, obj in inspect.getmembers(sys.modules[__name__]):
+ try:
+ if issubclass(obj, USRPCalibratorBase) \
+ and usrp_type in getattr(obj, 'mboard_ids', ''):
+ return obj(usrp, meas_dev, direction, **kwargs)
+ except TypeError:
+ continue
+ raise RuntimeError("No USRP calibrator object found for device type: {}"
+ .format(usrp_type))
diff --git a/host/python/uhd/usrp/cal/visa.py b/host/python/uhd/usrp/cal/visa.py
new file mode 100644
index 000000000..16c2aff56
--- /dev/null
+++ b/host/python/uhd/usrp/cal/visa.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+VISA-type measurement devices
+"""
+
+import re
+import sys
+import inspect
+import importlib
+
+class VISADevice:
+ """
+ Parent class for VISA/SCPI devices (that can be accessed via PyVISA)
+ """
+ # Must contain a dictionary res_ids. See class below as an example
+
+ def __init__(self, resource):
+ self.res = resource
+ # You can query the exact model etc. like this:
+ # id = self._res.query("*IDN?").strip(
+
+ def init_power_meter(self):
+ """
+ Initialize this device to measure power.
+ """
+ raise NotImplementedError()
+
+ def init_signal_generator(self):
+ """
+ Initialize this device to generate signals.
+ """
+ raise NotImplementedError()
+
+ def set_frequency(self, freq):
+ """
+ Set frequency
+ """
+ raise NotImplementedError()
+
+ def get_power_dbm(self):
+ """
+ Return the received/measured power in dBm (power meter) or return the
+ output power it's currently set to (signal generator).
+ """
+ raise NotImplementedError()
+
+
+class USBPowerMeter(VISADevice):
+ """
+ USB-based power measurement devices
+ """
+ # The keys of res_ids are regular expressions, which have to match the
+ # resource ID of the VISA device. The values are a name for the device, which
+ # is used for informing the user which driver was found.
+ #
+ # A class can match any number of resource IDs. If the commands used depend
+ # on the specific ID, it can be queried in the appropriate init function
+ # using the *IDN? command which is understood by all VISA devices.
+ res_ids = {
+ r'USB\d+::2733::376::\d+::0::INSTR': 'R&S NRP-6A',
+ }
+
+ def init_power_meter(self):
+ """
+ Enable the sensor to read power
+ """
+ self.res.timeout = 5000
+ self.res.write("SENS:AVER:COUN 20")
+ self.res.write("SENS:AVER:COUN:AUTO ON")
+ self.res.write('UNIT:POW DBM')
+ self.res.write('SENS:FUNC "POW:AVG"')
+
+ def init_signal_generator(self):
+ """
+ This class is for power meters, so no bueno
+ """
+ raise RuntimeError("This device cannot be used for signal generation!")
+
+ def set_frequency(self, freq):
+ """
+ Set frequency
+ """
+ self.res.write('SENS:FREQ {}'.format(freq))
+
+ def get_power_dbm(self):
+ """
+ Return measured power in dBm
+ """
+ self.res.write('INIT:IMM')
+ return float(self.res.query('FETCH?'))
+
+###############################################################################
+# The dispatch function
+###############################################################################
+def get_visa_device(resource, key, opt_dict):
+ """
+ Return the VISA device object
+ """
+ def match_res(obj):
+ """
+ Check if a class obj matches the requested key
+ """
+ for pattern, res_id in getattr(obj, 'res_ids', {}).items():
+ if re.match(pattern, key):
+ print("Found VISA device: {}".format(res_id))
+ return True
+ return False
+ # Import additional modules if requested
+ members = inspect.getmembers(sys.modules[__name__])
+ if 'import' in opt_dict:
+ try:
+ print("Loading external module: {}".format(opt_dict.get('import')))
+ external_module = importlib.import_module(opt_dict.get('import'))
+ members += inspect.getmembers(external_module)
+ except (ModuleNotFoundError, ImportError):
+ print("WARNING: Could not import module '{}'"
+ .format(opt_dict.get('import')))
+ # Now browse classes and find one that matches
+ for _, obj in members:
+ try:
+ if issubclass(obj, VISADevice) and match_res(obj):
+ return obj(resource)
+ except TypeError:
+ continue
+ raise RuntimeError("No VISA device class found for key: {}".format(key))