aboutsummaryrefslogtreecommitdiffstats
path: root/host
diff options
context:
space:
mode:
Diffstat (limited to 'host')
-rw-r--r--host/docs/power.dox117
-rw-r--r--host/python/uhd/__init__.py1
-rw-r--r--host/python/uhd/usrp/cal/__init__.py3
-rw-r--r--host/python/uhd/usrp/cal/meas_device.py353
-rw-r--r--host/python/uhd/usrp/cal/tone_gen.py61
-rw-r--r--host/python/uhd/usrp/cal/usrp_calibrator.py408
-rw-r--r--host/python/uhd/usrp/cal/visa.py129
-rw-r--r--host/utils/CMakeLists.txt1
-rw-r--r--host/utils/uhd_power_cal.py250
9 files changed, 1323 insertions, 0 deletions
diff --git a/host/docs/power.dox b/host/docs/power.dox
index 816c74824..7ba93fb61 100644
--- a/host/docs/power.dox
+++ b/host/docs/power.dox
@@ -8,6 +8,11 @@ Starting with UHD 4, UHD comes with reference power level APIs. These allow to
not just set a relative gain level, but configure a USRP to transmit a certain
power, or to estimate received power levels.
+\b DISCLAIMER: USRPs are not factory-calibrated test and measurement devices,
+but general purpose SDR devices. As such, they are not intended to replace
+factory-calibrated power meters or signal generators, nor are they suitable
+devices for doing so.
+
The actual transmitted or received power also depends on the transmitted signal
itself. The absolute power setting is thus a *reference power level*.
The reference power level maps a digital signal level (in dBFS) to an absolute,
@@ -150,5 +155,117 @@ These tables can be stored in three different ways:
- On a file. Calibration data in a file is the most flexible, it can be replaced
easily. It is however local to the computer that stores the calibration data.
+Access to the calibration data usually is done by using the uhd::usrp::cal::database
+class. Calibration data is identified by two identifiers, a key, and a serial.
+
+The \b key is a string that identifies a particular hardware path for an RF signal.
+For example, the B200 and B210 use the key `b2xx_power_cal_rx_rx2` to identify
+the RF path from the RX2 SMA connector on a B200mini all the way to the RFIC. On
+the B210, the same key is used for channels A and B, because the RF paths are
+identical (the PCB traces are symmetrical, and the components are the same as well).
+On the B200mini however, the RF path from RX2 to the RFIC is different (the PCB
+is smaller, for example) and thus the key is different (`b2xxmini_power_cal_rx_rx2`).
+
+The \b serial is usually derived from the serial of the DUT itself, but may also
+include other information, such as the channel. On the B210, the calibration
+serial consists of the motherboard serial plus the channel identifier (for
+example, if the device had a serial of `FFF1234`, the two calibration serials
+would be `FFF1234#A` and `FFF1234#B`. This way, the two channels can have their
+own calibration data.
+
+The key and serial that are used for a specific device can be queried from the
+device by either calling uhd::usrp::multi_usrp::get_usrp_rx_info() when using
+the multi_usrp API, or calling uhd::rfnoc::radio_control::get_rx_power_ref_keys().
+Equivalent calls for TX calibration are also available.
+
+If calibration data is hard-coded as part of UHD, the serial doesn't apply.
+That is because the only calibration data hard-coded in UHD is data that can be
+applied to all devices, and has been vetted against several measurement data
+sets. Such data will carry a much higher calibration error than specifically
+generated calibration data.
+
+\section power_usercal Calibrating your own device
+
+If UHD does not ship its own calibration data for a device, or if the power
+calbration must be finely tuned, it is necessary to manually calbrate the device.
+
+In order to calibrate the transmit power, a calibrated power meter is required.
+To calibrate the receive power, a calibrated signal generator is required. Note
+that it is possible to use a calibrated USRP as power meter / signal generator.
+A calibrated USRP can thus be used to calibrate another USRP.
+
+The calibration is performed using the `uhd_power_cal.py` utility, which is
+usually installed into the utilities directory (for example, `/usr/lib/uhd/utils`
+on some Linux distributions). It requires the Python API of UHD.
+
+The tool will control both the DUT (i.e., the USRP that is to be calibrated) as
+well as the measurement device (power meter or signal generator).
+UHD ships with some drivers for measurement devices, but can be extended for
+others easily (see \ref power_usercal_extend).
+
+In order to run a calibration, the measurement device and the DUT need to be
+connected to the host PC on which the calbration measurement is performed.
+The following command will calibrate a B200 transmit power using a VISA-based
+power meter:
+
+ uhd_power_cal.py --args type=b200 -d tx --meas-dev visa
+
+By default, it will try and calibrate all channels (for a B210). The calibration
+can be further constrained by limiting the frequency range, and the gain/frequency
+steps (for more coarse or fine calibration data).
+
+The tool has hard-coded some sensible defaults for most devices, such as frequency
+and gain steps.
+
+\subsection power_usercal_extend Extending the calibration utility for custom drivers
+
+\subsubsection power_usercal_extend_visa VISA/SCPI based devices
+
+Measurement devices using SCPI commands are particularly easy to add. UHD uses
+PyVISA to access VISA-based devices, so make sure to follow the PyVISA manual
+to set that driver up. For example, USB-based power meters may require setting
+additional privileges or system settings in order for user-space utilities to
+be able to communicate with them.
+
+Assuming PyVISA is working, and your VISA-based measurement device is reachable
+from PyVISA, exposing your VISA-based device is done by creating a Python
+module for your device. Assume the file is called `myvisa.py` and has the
+following content:
+
+~~~{.py~
+from uhd.usrp.cal.visa import VISADevice
+
+class MyVISADevice(VISADevice):
+ res_ids = {r'::1234::': "My VISA Device"}
+
+ def init_power_meter(self):
+ self.res.write("INIT") # Put appropriate SCPI commands here
+
+ # ... all other API calls ...
+~~~
+
+Now you can run the power calibration utility as such:
+
+ uhd_power_cal.py --meas-dev visa -o import=myvisa [other args]
+
+This will try and import `myvisa`, and will automatically detect classes within
+that file. If the VISA device used for calibration matches the resource ID (in
+this example, it needs to contain the substring `::1234::`), this class will be
+chosen. On success, the utility should print a string like
+"Found VISA device: My VISA Device".
+
+The file `visa.py` within the UHD Python module is a useful reference for
+writing custom VISA drivers.
+
+\subsubsection power_usercal_extend_generic Other measurement devices
+
+If a measurement device is not a VISA device, the procedure above can still
+be used. The only requirement is that the devices can be controlled from Python.
+The driver classes must derive either from `uhd.usrp.cal.meas_device.PowerMeterBase`
+or `uhd.usrp.cal.meas_device.PowerGeneratorBase`, respectively.
+
+The file `meas_device.py` in the UHD Python modulues contains the default
+drivers, as well as examples and further documentation.
+
*/
// vim:ft=doxygen:
diff --git a/host/python/uhd/__init__.py b/host/python/uhd/__init__.py
index e12a0626a..8b6a2b36c 100644
--- a/host/python/uhd/__init__.py
+++ b/host/python/uhd/__init__.py
@@ -11,4 +11,5 @@ from . import types
from . import usrp
from . import filters
from . import rfnoc
+from . import dsp
from .libpyuhd.paths import *
diff --git a/host/python/uhd/usrp/cal/__init__.py b/host/python/uhd/usrp/cal/__init__.py
index 77cc3ca35..53de91114 100644
--- a/host/python/uhd/usrp/cal/__init__.py
+++ b/host/python/uhd/usrp/cal/__init__.py
@@ -14,3 +14,6 @@ Python UHD Module: Calibration sub-module
# pylint: disable=wildcard-import
from .libtypes import *
# pylint: enable=wildcard-import
+
+from .meas_device import get_meas_device
+from .usrp_calibrator import get_usrp_calibrator
diff --git a/host/python/uhd/usrp/cal/meas_device.py b/host/python/uhd/usrp/cal/meas_device.py
new file mode 100644
index 000000000..22fe78c11
--- /dev/null
+++ b/host/python/uhd/usrp/cal/meas_device.py
@@ -0,0 +1,353 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+Measurement Device Class for UHD Power Calibration
+"""
+
+import sys
+import time
+import inspect
+import importlib
+import numpy
+import uhd
+from .tone_gen import ToneGenerator
+from .visa import get_visa_device
+
+###############################################################################
+# Base Classes
+###############################################################################
+class PowerMeterBase:
+ """
+ Base class for measuring output power (Tx) of the USRP. That means the
+ measurement device is receiving and the USRP (the DUT) is transmitting.
+ """
+ def __init__(self, options):
+ self._options = options
+ self.power_offset = 0
+
+ def set_frequency(self, freq):
+ """
+ Set the frequency of the measurement device.
+ """
+ raise NotImplementedError()
+
+ def get_power(self):
+ """
+ Return the current measured power in dBm.
+ """
+ return self._get_power() + self.power_offset
+
+ def _get_power(self):
+ """
+ Return the current measured power in dBm.
+ """
+ raise NotImplementedError()
+
+ # pylint: disable=no-self-use
+ def update_port(self, chan, antenna):
+ """
+ Tell the device we're measuring chan + antenna next
+ """
+ input("[TX] Connect your power meter to device channel {}, "
+ "antenna {}. Then, hit Enter.".format(chan, antenna))
+ # pylint: enable=no-self-use
+
+class SignalGeneratorBase:
+ """
+ Base class for measuring input power (Rx) of the USRP. That means the
+ measurement device is transmitting and the USRP (the DUT) is receiving.
+ """
+ def __init__(self, options):
+ self._options = options
+ self.power_offset = 0
+ # Make sure to set this before doing RX cal
+ self.max_output_power = None
+
+ def enable(self, enb=True):
+ """
+ Turn on the power generator. By default, it should be off, and only
+ produce a signal when this was called with an argument value of 'True'.
+ """
+ raise NotImplementedError()
+
+ def set_power(self, power_dbm):
+ """
+ Set the input power of the DUT. This will factor in the power offset,
+ and set the measurement device to produce the power that will cause the
+ DUT to receive power_dbm.
+
+ This will coerce to the next possible power available and return the
+ coerced value.
+ """
+ assert self.max_output_power
+ if power_dbm > self.max_output_power:
+ print("[SigGen] WARNING! Trying to set power beyond safe levels. "
+ "Capping output power at {} dBm.".format(self.max_output_power))
+ power_dbm = self.max_output_power
+ return self._set_power(power_dbm + self.power_offset) - self.power_offset
+
+ def get_power(self):
+ """
+ Return the input power of the DUT. This will factor in the power offset,
+ and will return the power level in dBm that is going into the DUT.
+ Use this with set_power(), as not all power levels can be reached.
+ """
+ return self._get_power() - self.power_offset
+
+ def set_frequency(self, freq):
+ """
+ Set the center frequency of the generated signal.
+ """
+ raise NotImplementedError()
+
+ def _set_power(self, power_dbm):
+ """
+ Set the output power of the device in dBm.
+ """
+ raise NotImplementedError()
+
+ def _get_power(self):
+ """
+ Return the output power of the measurement device.
+ """
+ raise NotImplementedError()
+
+ # pylint: disable=no-self-use
+ def update_port(self, chan, antenna):
+ """
+ Tell the device we're measuring chan + antenna next
+ """
+ input("[RX] Connect your signal generator to device channel {}, "
+ "antenna {}. Then, hit Enter.".format(chan, antenna))
+ # pylint: enable=no-self-use
+
+
+###############################################################################
+# Manual Measurement: For masochists, or for small sample sets
+###############################################################################
+class ManualPowerMeter(PowerMeterBase):
+ """
+ Manual measurement: The script does nothing, it just asks the user to
+ manually make changes and return values
+ """
+ key = 'manual'
+
+ def set_frequency(self, freq):
+ """
+ Ask user to set frequency
+ """
+ input("[TX] Set your power meter to following frequency: "
+ "{:.3f} MHz, then hit Enter.".format(freq/1e6))
+
+ def _get_power(self):
+ """
+ Ask user for the power
+ """
+ num_tries = 5
+ for _ in range(num_tries):
+ try:
+ return float(input("[TX] Please enter the measured power in dBm: ")) \
+ + self.power_offset
+ except ValueError:
+ continue
+ raise ValueError("Invalid power value entered.")
+
+class ManualPowerGenerator(SignalGeneratorBase):
+ """
+ Manual measurement: The script does nothing, it just asks the user to
+ manually make changes and return values
+ """
+ key = 'manual'
+ num_tries = 5
+
+ def enable(self, enable=True):
+ """
+ Ask the user to turn the device on or off
+ """
+ input("[RX] Please {} your signal generator and hit Enter."
+ .format("enable" if enable else "disable"))
+
+ def _set_power(self, power_dbm):
+ """
+ Ask for a power, or the closest, and return that
+ """
+ new_power = input(
+ "[RX] Set your signal generator to following output power: "
+ "{:.1f} dBm, then hit Enter, or enter the closest available power: "
+ .format(power_dbm))
+ if not new_power:
+ return power_dbm
+ for _ in range(self.num_tries):
+ try:
+ return float(new_power)
+ except ValueError:
+ new_power = input(
+ "[RX] Set your signal generator to following output power: "
+ "{:.1f} dBm, then hit Enter, or enter the closest available power: "
+ .format(power_dbm))
+ if not new_power:
+ return power_dbm
+ raise ValueError("Invalid power value entered.")
+
+ def _get_power(self):
+ """
+ Ask user for current power
+ """
+ for _ in range(self.num_tries):
+ try:
+ return float(input(
+ "[RX] Please enter the output power in dBm of your "
+ "signal generator: "))
+ except ValueError:
+ continue
+ raise ValueError("Invalid power value entered.")
+
+ # pylint: disable=no-self-use
+ def set_frequency(self, freq):
+ """
+ Set the center frequency of the generated signal.
+ """
+ input("[RX] Set your signal generator to following frequency: {:.3f} MHz, then hit Enter."
+ .format(freq/1e6))
+ # pylint: enable=no-self-use
+
+###############################################################################
+# VISA: Run through a VISA device, using SCPI commands
+###############################################################################
+class VisaPowerMeter(PowerMeterBase):
+ """
+ VISA based Tx measurement device
+ """
+ DEFAULT_VISA_LIB = '@py' # pyvisa-py
+ DEFAULT_VISA_QUERY = "?*::INSTR"
+
+ key = 'visa'
+
+ def __init__(self, options):
+ super().__init__(options)
+ # pylint: disable=import-outside-toplevel
+ # We disable this warning because having pyvisa installed is not a
+ # requirement, so we want to load it as late as possible, and only when
+ # needed.
+ import pyvisa
+ # pylint: enable=import-outside-toplevel
+ visa_lib = options.get('visa_lib', self.DEFAULT_VISA_LIB)
+ visa_query = options.get('visa_query', self.DEFAULT_VISA_QUERY)
+ self._rm = pyvisa.ResourceManager(visa_lib)
+ resources = self._rm.list_resources(visa_query)
+ if len(resources) > 1:
+ print("Found VISA devices:")
+ for resource in resources:
+ print("*" + resource)
+ raise RuntimeError(
+ "Found more than one measurement device. Please limit the query!")
+ if len(resources) == 0:
+ raise RuntimeError("No measurement device found!")
+ self._res = self._rm.open_resource(resources[0])
+ self.visa = get_visa_device(self._res, resources[0], options)
+ self.visa.init_power_meter()
+
+ def set_frequency(self, freq):
+ """
+ Set frequency
+ """
+ self.visa.set_frequency(freq)
+
+ def _get_power(self):
+ """
+ Get power
+ """
+ return self.visa.get_power_dbm()
+
+###############################################################################
+# USRP: Use a pre-calibrated USRP as a measurement device
+###############################################################################
+class USRPPowerGenerator(SignalGeneratorBase):
+ """
+ The power generator is actually a USRP. This only works if the USRP that is
+ used for power/signal generation has been previously calbrated itself.
+ """
+ key = 'usrp'
+
+ def __init__(self, options):
+ super().__init__(options)
+ self._usrp = uhd.usrp.MultiUSRP(options.get('args'))
+ self._rate = float(options.get('rate', 5e6))
+ self._lo_offset = float(options.get('lo_offset', 0))
+ self._chan = int(options.get('chan', 0))
+ self._amplitude = float(options.get('ampl', 1/numpy.sqrt(2)))
+ self._pwr_dbfs = 20 * numpy.log10(self._amplitude)
+ self._tone_freq = 0
+ stream_args = uhd.usrp.StreamArgs('fc32', 'sc16')
+ stream_args.channels = [self._chan]
+ self._streamer = self._usrp.get_tx_stream(stream_args)
+ print("==== Creating USRP tone generator. Power offset:", self.power_offset)
+ self._tone_gen = ToneGenerator(self._rate, self._tone_freq, self._amplitude)
+ self._tone_gen.set_streamer(self._streamer)
+
+ def enable(self, enb=True):
+ """
+ Turn the tone generator on or off.
+ """
+ if enb:
+ print("[SigGen] Starting tone generator.")
+ self._tone_gen.start()
+ else:
+ print("[SigGen] Stopping tone generator.")
+ self._tone_gen.stop()
+ time.sleep(0.1) # Give it some time to spin or down
+
+ def set_frequency(self, freq):
+ """
+ Set the center frequency of the generated signal.
+ """
+ print("[SigGen] Channel {}: Tuning signal to {:.3f} MHz."
+ .format(self._chan, freq/1e6))
+ tune_req = uhd.types.TuneRequest(freq, self._lo_offset)
+ self._usrp.set_tx_freq(tune_req, self._chan)
+
+ def _set_power(self, power_dbm):
+ """
+ Set the output power of the device in dBm.
+ """
+ self._usrp.set_tx_power_reference(power_dbm - self._pwr_dbfs, self._chan)
+ return self._get_power()
+
+ def _get_power(self):
+ """
+ Return the output power of the measurement device.
+ """
+ return self._usrp.get_tx_power_reference(self._chan) + self._pwr_dbfs
+
+###############################################################################
+# The dispatch function
+###############################################################################
+def get_meas_device(direction, dev_key, options):
+ """
+ Return the measurement device object
+ """
+ assert direction in ('tx', 'rx')
+ base_class = SignalGeneratorBase if direction == 'rx' else PowerMeterBase
+ opt_dict = {
+ k[0]: k[1] if len(k) > 1 else None for k in [x.split("=", 1) for x in options]
+ }
+ members = inspect.getmembers(sys.modules[__name__])
+ if 'import' in opt_dict:
+ try:
+ print("Loading external module: {}".format(opt_dict.get('import')))
+ external_module = importlib.import_module(opt_dict.get('import'))
+ members += inspect.getmembers(external_module)
+ except (ModuleNotFoundError, ImportError):
+ print("WARNING: Could not import module '{}'"
+ .format(opt_dict.get('import')))
+ for _, obj in members:
+ try:
+ if issubclass(obj, base_class) and dev_key == getattr(obj, 'key', ''):
+ return obj(opt_dict)
+ except TypeError:
+ continue
+ raise RuntimeError("No {} measurement device found for key: {}".format(
+ direction.upper(), dev_key))
diff --git a/host/python/uhd/usrp/cal/tone_gen.py b/host/python/uhd/usrp/cal/tone_gen.py
new file mode 100644
index 000000000..b9d986132
--- /dev/null
+++ b/host/python/uhd/usrp/cal/tone_gen.py
@@ -0,0 +1,61 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+Provide a tone generator class for USRPs.
+"""
+
+import threading
+import numpy
+import uhd
+
+class ToneGenerator:
+ """
+ Class that can output a tone from a different thread until told to stop
+ """
+ def __init__(self, rate, freq, ampl, streamer=None):
+ self._streamer = streamer
+ self._buffer = uhd.dsp.signals.get_continuous_tone(rate, freq, ampl)
+ self._run = False
+ self._thread = None
+
+ def set_streamer(self, streamer):
+ """
+ Update streamer object
+ """
+ if self._run:
+ self.stop()
+ self._streamer = streamer
+
+ def start(self):
+ """
+ Spawn the thread in the background
+ """
+ if not self._streamer:
+ raise RuntimeError("No streamer defined!")
+ self._run = True
+ self._thread = threading.Thread(target=self._worker)
+ self._thread.start()
+ self._thread.setName("cal_tx")
+
+ def stop(self):
+ """
+ Stop the transmitter
+ """
+ self._run = False
+ self._thread.join()
+ self._thread = None
+
+ def _worker(self):
+ """ Here is where the action happens """
+ metadata = uhd.types.TXMetadata()
+ while self._run:
+ # Give it a long-ish timeout so we don't have to throttle in here
+ if self._streamer.send(self._buffer, metadata, 1.0) != len(self._buffer):
+ print("WARNING: Failed to transmit entire buffer in ToneGenerator!")
+ # Send an EOB packet with a single zero-valued sample to close out TX
+ metadata.end_of_burst = True
+ self._streamer.send(
+ numpy.array([0], dtype=numpy.complex64), metadata, 0.1)
diff --git a/host/python/uhd/usrp/cal/usrp_calibrator.py b/host/python/uhd/usrp/cal/usrp_calibrator.py
new file mode 100644
index 000000000..a70747201
--- /dev/null
+++ b/host/python/uhd/usrp/cal/usrp_calibrator.py
@@ -0,0 +1,408 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+UHD Power Cal: USRP Calbration Utility Objects
+"""
+
+import time
+import inspect
+import sys
+import numpy
+import uhd
+
+from . import database
+from .tone_gen import ToneGenerator
+
+NUM_SAMPS_PER_EST = int(1e6)
+# Limits for the power estimation algorithm. For good estimates, we want the
+# signal to be at -6 dBFS, but not outside of an upper or lower limit.
+PWR_EST_LLIM = -20
+PWR_EST_IDEAL_LEVEL = -6
+PWR_EST_ULIM = -3
+SIGPWR_LOCK_MAX_ITER = 4
+
+# The default distance between frequencies at which we measure
+DEFAULT_FREQ_STEP = 10e6 # Hz
+DEFAULT_SAMP_RATE = 5e6
+
+def get_streamer(usrp, direction, chan):
+ """
+ Create an appropriate streamer object for this channel
+ """
+ stream_args = uhd.usrp.StreamArgs('fc32', 'sc16')
+ stream_args.channels = [chan]
+ return usrp.get_rx_stream(stream_args) if direction == 'rx' \
+ else usrp.get_tx_stream(stream_args)
+
+def get_default_gains(direction, gain_range, gain_step):
+ """
+ Create a equidistant gain range for calibration
+ """
+ assert direction in ('rx', 'tx')
+ if direction == 'tx':
+ return numpy.arange(0, gain_range.stop(), gain_step)
+ return numpy.arange(gain_range.stop(), 0, -gain_step)
+
+def get_usrp_power(streamer, num_samps=NUM_SAMPS_PER_EST, chan=0):
+ """
+ Return the measured input power in dBFS
+
+ The return value is a list of dBFS power values, one per channel.
+ """
+ recv_buffer = numpy.zeros(
+ (streamer.get_num_channels(), num_samps), dtype=numpy.complex64)
+ metadata = uhd.types.RXMetadata()
+ stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done)
+ stream_cmd.num_samps = num_samps
+ stream_cmd.stream_now = True
+ streamer.issue_stream_cmd(stream_cmd)
+ # Pass in long timeout, so we can rx the entire buffer in one go
+ samps_recvd = streamer.recv(recv_buffer, metadata, 5.0)
+ if samps_recvd != num_samps:
+ raise RuntimeError(
+ "ERROR! get_usrp_power(): Did not receive the correct number of samples!")
+ return uhd.dsp.signals.get_power_dbfs(recv_buffer[chan])
+
+
+def subtract_power(p1_db, p2_db):
+ """
+ Return the power of p1 subtracted from p2, where both are given in logarithmic
+ units.
+ """
+ return 10 * numpy.log10(10**(p1_db/10) - 10**(p2_db/10))
+
+###############################################################################
+# Base Class
+###############################################################################
+class USRPCalibratorBase(object):
+ """
+ Base class for device calibration. Every USRP that can do power calibration
+ needs to implement this.
+ """
+ # These should be overriden to device-specific values
+ max_input_power = -20 # -20 happens to be a safe value for all devices
+ min_detectable_signal = -70
+ default_rate = DEFAULT_SAMP_RATE
+ lo_offset = 0.0
+
+ def __init__(self, usrp, meas_dev, direction, **kwargs):
+ self._usrp = usrp
+ self._meas_dev = meas_dev
+ # This makes sure our measurement will not destroy the DUT
+ self._meas_dev.max_output_power = self.max_input_power
+ self._dir = direction
+ self._desired_gain_step = kwargs.get('gain_step', 10)
+ self._id = usrp.get_usrp_rx_info(0).get('mboard_id')
+ self._mb_serial = usrp.get_usrp_rx_info(0).get('mboard_serial')
+ # Littler helper to print stuff with a device ID prefix.
+ self.log = lambda *args, **kwargs: print("[{}]".format(self._id), *args, **kwargs)
+ # Channel, antenna, and streamer will get updated in update_port()
+ self._chan = None
+ self._ant = ""
+ self._streamer = None
+ # These dictionaries store the results that get written out as well as
+ # the noise floor for reference
+ self.results = {} # This must be of the form results[freq][gain] = power
+ self._noise = {}
+ # The tone generator object is only needed for Tx measurements, and is
+ # initialized conditionaly in init()
+ self._tone_gen = None
+ # The gains can be overridden by the device if non-equidistant gains are
+ # desired. However, gains must be increasing order for Tx measurements,
+ # and in decreasing order for Rx measurements.
+ self._gains = get_default_gains(
+ direction,
+ getattr(self._usrp, 'get_{}_gain_range'.format(self._dir))(),
+ self._desired_gain_step)
+ # You might want to override this, but it's not important. It will
+ # become the 'name' argument for the power cal factory.
+ self.cal_name = self._id + " Power Cal"
+ # The child class can store temperature and ref gain here
+ self.temp = None
+ self.ref_gain = None
+
+ def init(self, rate, tone_freq, amplitude):
+ """
+ Initialize device with finalized values. Not that __init__() needs to
+ finish before the call site knows the rate, so we can' fold this into
+ __init__().
+ """
+ if self._dir == 'tx':
+ self._tone_gen = ToneGenerator(rate, tone_freq, amplitude)
+
+ def update_port(self, chan, antenna):
+ """
+ Notify the device that we've switched channel and/or antenna.
+ """
+ self.log("Switching to channel {}, antenna {}.".format(chan, antenna))
+ self._ant = antenna
+ if chan != self._chan:
+ # This will be an RX streamer for RX power cal, and a TX streamer
+ # for TX power cal.
+ self._streamer = get_streamer(self._usrp, self._dir, chan)
+ if self._dir == 'tx':
+ self._tone_gen.set_streamer(self._streamer)
+ self._chan = chan
+
+ def _get_frequencies(self, start_hint=None, stop_hint=None, step_hint=None):
+ """
+ Return an iterable of frequencies for testing.
+
+ The default will check the hints against the device, but otherwise heed
+ them.
+
+ If a particular device needs to check specific frequencies, then
+ override this.
+ """
+ start_min = \
+ getattr(self._usrp, 'get_{}_freq_range'.format(self._dir))(
+ self._chan).start()
+ start_hint = start_hint or start_min
+ start = max(start_hint, start_min)
+ stop_max = \
+ getattr(self._usrp, 'get_{}_freq_range'.format(self._dir))(
+ self._chan).stop()
+ stop_hint = stop_hint or stop_max
+ stop = min(stop_hint, stop_max)
+ step = step_hint or DEFAULT_FREQ_STEP
+ return numpy.arange(start, stop + step, step)
+
+ def init_frequencies(self, start_hint, stop_hint, step_hint):
+ """
+ Return an iterable of frequencies for testing.
+
+ The default will check the hints against the device, but otherwise heed
+ them.
+
+ Then it will measure the noise floor across frequency to get a good
+ baseline measurement.
+ """
+ freqs = self._get_frequencies(start_hint, stop_hint, step_hint)
+ if self._dir == 'tx':
+ print("===== Measuring noise floor across frequency...")
+ for freq in freqs:
+ self._meas_dev.set_frequency(freq)
+ self._noise[freq] = self._meas_dev.get_power()
+ print("[TX] Noise floor: {:2} MHz => {:+6.2f} dBm"
+ .format(freq/1e6, self._noise[freq]))
+ else: # Rx
+ print("===== Measuring noise floor across frequency and gain...")
+ for freq in freqs:
+ self._noise[freq] = {}
+ tune_req = uhd.types.TuneRequest(freq)
+ self._usrp.set_rx_freq(tune_req, self._chan)
+ for gain in self._gains:
+ self._usrp.set_rx_gain(gain, self._chan)
+ self._noise[freq][gain] = get_usrp_power(self._streamer)
+ print("[RX] Noise floor: {:2} MHz / {} dB => {:+6.2f} dBFS"
+ .format(freq/1e6, gain, self._noise[freq][gain]))
+ return freqs
+
+ def start(self):
+ """
+ Initialize the device for calibration
+ """
+ if self._dir == 'tx':
+ self._tone_gen.start()
+ else:
+ self._meas_dev.enable(True)
+
+ def stop(self, store=True):
+ """
+ Shut down the device after calibration
+ """
+ if self._dir == 'tx':
+ self._tone_gen.stop()
+ else:
+ self._meas_dev.enable(False)
+ if store:
+ self.store()
+
+ def run_rx_cal(self, freq):
+ """
+ Run the actual RX calibration for this frequency.
+ """
+ # Go to highest gain, lock in signal generator
+ self._usrp.set_rx_gain(max(self._gains), self._chan)
+ time.sleep(0.1) # Settling time of the USRP, highly conservative
+ self.log("Locking in signal generator power...")
+ self.log("Requesting input power: {:+.2f} dBm."
+ .format(self.min_detectable_signal))
+ usrp_input_power = self._meas_dev.set_power(self.min_detectable_signal)
+ recvd_power = get_usrp_power(self._streamer)
+ self.log("Got input power: {:+.2f} dBm. Received power: {:.2f} dBFS. "
+ "Requesting new input power: {:+.2f} dBm."
+ .format(usrp_input_power,
+ recvd_power,
+ usrp_input_power + PWR_EST_IDEAL_LEVEL - recvd_power))
+ usrp_input_power = self._meas_dev.set_power(
+ usrp_input_power + PWR_EST_IDEAL_LEVEL - recvd_power)
+ siggen_locked = False
+ for _ in range(SIGPWR_LOCK_MAX_ITER):
+ recvd_power = get_usrp_power(self._streamer)
+ if PWR_EST_LLIM <= recvd_power <= PWR_EST_ULIM:
+ siggen_locked = True
+ break
+ self.log("Receiving input power: {:+.2f} dBFS.".format(recvd_power))
+ power_delta = PWR_EST_IDEAL_LEVEL - recvd_power
+ # Update power output by the delta from the desired input value:
+ self.log("Requesting input power: {:+.2f} dBm."
+ .format(usrp_input_power + power_delta))
+ usrp_input_power = self._meas_dev.set_power(usrp_input_power + power_delta)
+ if not siggen_locked:
+ raise RuntimeError(
+ "Unable to lock siggen within {} iterations! Last input power level: {:+6.2f} dBm."
+ .format(SIGPWR_LOCK_MAX_ITER, usrp_input_power))
+ self.log("Locked signal generator in at input power level: {:+6.2f} dBm."
+ .format(usrp_input_power))
+ # Now iterate through gains
+ results = {}
+ # Gains are in decreasing order!
+ last_gain = self._gains[0]
+ for gain in self._gains:
+ self._usrp.set_rx_gain(gain, self._chan) # Set the new gain
+ self.log("Set gain to: {} dB. Got gain: {} dB."
+ .format(gain, self._usrp.get_rx_gain(self._chan)))
+ time.sleep(0.1) # Settling time of the USRP, highly conservative
+ gain_delta = last_gain - gain # This is our gain step
+ if gain_delta:
+ # If we decrease the device gain, we need to crank up the input
+ # power
+ usrp_input_power = self._meas_dev.set_power(
+ min(usrp_input_power + gain_delta, self.max_input_power))
+ # usrp_input_power = self._meas_dev.set_power(usrp_input_power + gain_delta)
+ self.log("New input power is: {:+.2f} dBm".format(usrp_input_power))
+ recvd_power = get_usrp_power(self._streamer)
+ self.log("Received power: {:.2f} dBFS".format(recvd_power))
+ # It's possible that we lose the lock on the signal power, so allow
+ # for a correction
+ if not PWR_EST_LLIM <= recvd_power <= PWR_EST_ULIM:
+ power_delta = PWR_EST_IDEAL_LEVEL - recvd_power
+ self.log("Adapting input power to: {:+.2f} dBm."
+ .format(usrp_input_power + power_delta))
+ usrp_input_power = self._meas_dev.set_power(usrp_input_power + power_delta)
+ self.log("New input power is: {:+.2f} dBm".format(usrp_input_power))
+ # And then of course, measure again
+ recvd_power = get_usrp_power(self._streamer)
+ self.log("Received power: {:.2f} dBFS".format(recvd_power))
+ # Note: The noise power should be way down there, and really
+ # shouldn't matter. We subtract it anyway for formal correctness.
+ recvd_signal_power = subtract_power(recvd_power, self._noise[freq][gain])
+ # A note on the following equation: 'recvd_signal_power' is in dBFS,
+ # and usrp_input_power is in dBm. However, this is the reference
+ # signal, so we need the power (in dBm) that corresponds to 0 dBFS.
+ # The assumption is that digital gain is linear, so what we really
+ # want is usrp_input_power - (recvd_signal_power - 0dBFS), and the
+ # result of the equation is in dBm again. We omit the subtract-by-zero
+ # since our variables don't have units.
+ results[gain] = usrp_input_power - recvd_signal_power
+ self.log("{:2} dB => {:+6.2f} dBm".format(gain, results[gain]))
+ # If we get too close to the noise floor, we stop
+ if recvd_power - self._noise[freq][gain] <= 1.5:
+ self.log("Can no longer detect input signal. Terminating.")
+ break
+ self.results[freq] = results
+
+ def run_tx_cal(self, freq):
+ """
+ Run the actual TX calibration for this frequency.
+ """
+ results = {}
+ for gain in self._gains:
+ self._usrp.set_tx_gain(gain, self._chan)
+ time.sleep(0.1) # Settling time of the USRP, highly conservative
+ results[gain] = self._meas_dev.get_power()
+ self.log("{:2} dB => {:+6.2f} dBm".format(gain, results[gain]))
+ self.results[freq] = results
+
+ def store(self):
+ """
+ Return the results object
+ """
+ chan_info = getattr(self._usrp, "get_usrp_{}_info".format(self._dir))(self._chan)
+ cal_key = chan_info.get("{}_ref_power_key".format(self._dir))
+ cal_serial = chan_info.get("{}_ref_power_serial".format(self._dir))
+ cal_data = uhd.usrp.cal.PwrCal(self.cal_name, cal_serial, int(time.time()))
+ if self.temp:
+ cal_data.set_temperature(self.temp)
+ if self.ref_gain:
+ cal_data.set_ref_gain(self.ref_gain)
+ for freq, results in self.results.items():
+ max_power = max(results.values())
+ min_power = min(results.values())
+ cal_data.add_power_table(results, min_power, max_power, freq)
+ database.write_cal_data(
+ cal_key,
+ cal_serial,
+ cal_data.serialize())
+ self.results = {}
+
+class B200Calibrator(USRPCalibratorBase):
+ """
+ B200 calibration
+ """
+ mboard_ids = ('B200', 'B210', 'B200mini', 'B205mini')
+ # Choosing 5 MHz: It is a small rate, but carries enough bandwidth to receive
+ # a tone. By default, the auto MCR will kick in and set the MCR to 40 Msps,
+ # thus engaging all halfbands.
+ default_rate = 5e6
+ # Choosing an LO offset of 10 MHz: At 5 Msps, the LO will never be within
+ # our estimate. B200 generally has good DC offset / IQ balance performance,
+ # but we still try and avoid DC as much as possible.
+ lo_offset = 10e6
+
+ def __init__(self, usrp, meas_dev, direction, **kwargs):
+ super().__init__(usrp, meas_dev, direction, **kwargs)
+ print("===== Reading temperature... ", end="")
+ self.temp = self._usrp.get_rx_sensor("temp").to_int()
+ print("{} C".format(self.temp))
+ # TODO don't hard code
+ self.ref_gain = 60
+
+class X300Calibrator(USRPCalibratorBase):
+ """
+ X300/X310 calibration
+
+ Notes / TODOs:
+ - The X310 must avoid frequencies that are multiples of 200 MHz; here be
+ harmonics (TODO)
+ - TwinRX needs its own special method. It needs to be run once with one
+ channel in the streamer, and once with two channels in the streamer. Or we
+ come up with a clever way of enabling the other channel without modifying
+ the streamer. A poke to the prop tree might do the trick.
+ """
+ mboard_ids = ('X300', 'X310', 'NI-2974')
+ # Choosing 5 MHz: It is a small rate, but carries enough bandwidth to receive
+ # a tone. It's 1/40 the master clock rate, which means it'll engage max
+ # halfbands.
+ default_rate = 5e6
+ # Choosing an LO offset of 10 MHz: At 5 Msps, the LO will never be within
+ # our estimate, so it doesn't matter if this device is DC offset / IQ balance
+ # calibrated.
+ lo_offset = 10e6
+
+
+###############################################################################
+# The dispatch function
+###############################################################################
+def get_usrp_calibrator(usrp, meas_dev, direction, **kwargs):
+ """
+ Return a USRP calibrator object.
+ """
+ usrp_type = \
+ getattr(usrp, 'get_usrp_{}_info'.format(direction))().get('mboard_id')
+ if usrp_type is None:
+ raise RuntimeError("Could not determine USRP type!")
+ print("=== Detected USRP type:", usrp_type)
+ for _, obj in inspect.getmembers(sys.modules[__name__]):
+ try:
+ if issubclass(obj, USRPCalibratorBase) \
+ and usrp_type in getattr(obj, 'mboard_ids', ''):
+ return obj(usrp, meas_dev, direction, **kwargs)
+ except TypeError:
+ continue
+ raise RuntimeError("No USRP calibrator object found for device type: {}"
+ .format(usrp_type))
diff --git a/host/python/uhd/usrp/cal/visa.py b/host/python/uhd/usrp/cal/visa.py
new file mode 100644
index 000000000..16c2aff56
--- /dev/null
+++ b/host/python/uhd/usrp/cal/visa.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+VISA-type measurement devices
+"""
+
+import re
+import sys
+import inspect
+import importlib
+
+class VISADevice:
+ """
+ Parent class for VISA/SCPI devices (that can be accessed via PyVISA)
+ """
+ # Must contain a dictionary res_ids. See class below as an example
+
+ def __init__(self, resource):
+ self.res = resource
+ # You can query the exact model etc. like this:
+ # id = self._res.query("*IDN?").strip(
+
+ def init_power_meter(self):
+ """
+ Initialize this device to measure power.
+ """
+ raise NotImplementedError()
+
+ def init_signal_generator(self):
+ """
+ Initialize this device to generate signals.
+ """
+ raise NotImplementedError()
+
+ def set_frequency(self, freq):
+ """
+ Set frequency
+ """
+ raise NotImplementedError()
+
+ def get_power_dbm(self):
+ """
+ Return the received/measured power in dBm (power meter) or return the
+ output power it's currently set to (signal generator).
+ """
+ raise NotImplementedError()
+
+
+class USBPowerMeter(VISADevice):
+ """
+ USB-based power measurement devices
+ """
+ # The keys of res_ids are regular expressions, which have to match the
+ # resource ID of the VISA device. The values are a name for the device, which
+ # is used for informing the user which driver was found.
+ #
+ # A class can match any number of resource IDs. If the commands used depend
+ # on the specific ID, it can be queried in the appropriate init function
+ # using the *IDN? command which is understood by all VISA devices.
+ res_ids = {
+ r'USB\d+::2733::376::\d+::0::INSTR': 'R&S NRP-6A',
+ }
+
+ def init_power_meter(self):
+ """
+ Enable the sensor to read power
+ """
+ self.res.timeout = 5000
+ self.res.write("SENS:AVER:COUN 20")
+ self.res.write("SENS:AVER:COUN:AUTO ON")
+ self.res.write('UNIT:POW DBM')
+ self.res.write('SENS:FUNC "POW:AVG"')
+
+ def init_signal_generator(self):
+ """
+ This class is for power meters, so no bueno
+ """
+ raise RuntimeError("This device cannot be used for signal generation!")
+
+ def set_frequency(self, freq):
+ """
+ Set frequency
+ """
+ self.res.write('SENS:FREQ {}'.format(freq))
+
+ def get_power_dbm(self):
+ """
+ Return measured power in dBm
+ """
+ self.res.write('INIT:IMM')
+ return float(self.res.query('FETCH?'))
+
+###############################################################################
+# The dispatch function
+###############################################################################
+def get_visa_device(resource, key, opt_dict):
+ """
+ Return the VISA device object
+ """
+ def match_res(obj):
+ """
+ Check if a class obj matches the requested key
+ """
+ for pattern, res_id in getattr(obj, 'res_ids', {}).items():
+ if re.match(pattern, key):
+ print("Found VISA device: {}".format(res_id))
+ return True
+ return False
+ # Import additional modules if requested
+ members = inspect.getmembers(sys.modules[__name__])
+ if 'import' in opt_dict:
+ try:
+ print("Loading external module: {}".format(opt_dict.get('import')))
+ external_module = importlib.import_module(opt_dict.get('import'))
+ members += inspect.getmembers(external_module)
+ except (ModuleNotFoundError, ImportError):
+ print("WARNING: Could not import module '{}'"
+ .format(opt_dict.get('import')))
+ # Now browse classes and find one that matches
+ for _, obj in members:
+ try:
+ if issubclass(obj, VISADevice) and match_res(obj):
+ return obj(resource)
+ except TypeError:
+ continue
+ raise RuntimeError("No VISA device class found for key: {}".format(key))
diff --git a/host/utils/CMakeLists.txt b/host/utils/CMakeLists.txt
index 08ace6a0c..4bbcd252b 100644
--- a/host/utils/CMakeLists.txt
+++ b/host/utils/CMakeLists.txt
@@ -50,6 +50,7 @@ set(util_share_sources
set(util_share_sources_py
converter_benchmark.py
convert_cal_data.py
+ uhd_power_cal.py
)
if(ENABLE_USB)
list(APPEND util_share_sources
diff --git a/host/utils/uhd_power_cal.py b/host/utils/uhd_power_cal.py
new file mode 100644
index 000000000..b71e8f3ab
--- /dev/null
+++ b/host/utils/uhd_power_cal.py
@@ -0,0 +1,250 @@
+#!/usr/bin/env python3
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+Utility to run power calibrations with USRPs
+"""
+
+import sys
+import math
+import pickle
+import argparse
+import uhd
+
+def parse_args():
+ """ Parse args and return args object """
+ parser = argparse.ArgumentParser(
+ description="Run power level calibration for supported USRPs",
+ )
+ parser.add_argument(
+ '--args', default="",
+ help="USRP Device Args",
+ )
+ parser.add_argument(
+ '-d', '--dir', default='tx',
+ help="Direction: Must be either rx or tx. From the perspective of the "
+ "device: rx == receive power levels, tx == transmit power levels.",
+ choices=['tx', 'rx'])
+ parser.add_argument(
+ '--start',
+ help='Start Frequency. Defaults to lowest available frequency on device. '
+ 'Note that this is only a hint for the device object, which can choose '
+ 'to override this value.', type=float)
+ parser.add_argument(
+ '--stop',
+ help='Stop Frequency. Defaults to highest available frequency on device. '
+ 'Note that this is only a hint for the device object, which can choose '
+ 'to override this value.', type=float)
+ parser.add_argument(
+ '--step',
+ help='Frequency Step. Defaults are device-specific.'
+ 'Note that this is only a hint for the device object, which can choose '
+ 'to override this value. Devices can also measure at non-regular '
+ 'frequencies, e.g., to more accurately cover differences between bands.',
+ type=float)
+ parser.add_argument(
+ '--gain-step', type=float, default=1,
+ help='Gain Step (dB). Defaults to 1 dB.'
+ 'Note that this is only a hint for the device object, which can choose '
+ 'to override this value. Devices can also measure at non-regular '
+ 'gain intervals.')
+ parser.add_argument(
+ '--lo-offset', type=float,
+ help='LO Offset. This gets applied to every tune request. Note that for '
+ 'TX measurements, there is also an offset applied by --tone-freq. '
+ 'The default value is device-dependent.')
+ parser.add_argument(
+ '--amplitude', type=float, default=1./math.sqrt(2),
+ help='Amplitude of the tone that is generated for tx measurements. '
+ 'Default is 1/sqrt(2), or -3 dBFS.')
+ parser.add_argument(
+ '--attenuation', type=float, default=0.0,
+ help='Amount of attenuation between measurement device and DUT. This will '
+ 'be accounted for by simple addition, it is treated like a measurement error. '
+ 'The argument is generally positive, e.g. 30 means 30 dB of attenuation.')
+ parser.add_argument(
+ '--tone-freq', type=float, default=1e6,
+ help='Frequency of the tone that is generated for Tx measurements. This'
+ 'has the same effect as setting an LO offset, except in software.')
+ parser.add_argument(
+ '--antenna', default="*",
+ help="Select antenna port. A value of '*' means that the calibration "
+ "will be repeated on all appropriate antenna ports.")
+ parser.add_argument(
+ '--channels', default="0",
+ help="Select channel. A value of '*' means that the calibration "
+ "will be repeated on all appropriate channels.")
+ parser.add_argument(
+ '--meas-dev', default='manual',
+ help='Type of measurement device that is used')
+ parser.add_argument(
+ '-o', '--meas-option', default=[], action='append',
+ help='Options that are passed to the measurement device')
+ parser.add_argument(
+ '-r', '--rate', type=float,
+ help='Sampling rate at which the calibration is performed')
+ parser.add_argument(
+ '--store', metavar='filename.pickle',
+ help='If provided, will store intermediate cal data. This can be analyzed '
+ 'separately, or loaded into the tool with --load.')
+ parser.add_argument(
+ '--load', metavar='filename.pickle',
+ help='If provided, will load intermediate cal data instead of running a '
+ 'measurement.')
+ return parser.parse_args()
+
+
+def sanitize_args(usrp, args, default_rate):
+ """
+ Check the args against the USRP object.
+ """
+ assert usrp.get_num_mboards() == 1, \
+ "Power calibration tools are designed for a single motherboard!"
+ available_chans = getattr(usrp, 'get_{}_num_channels'.format(args.dir))()
+ if args.channels == '*':
+ # * means all channels
+ channels = list(range(available_chans))
+ else:
+ try:
+ channels = [int(c) for c in args.channels.split(',')]
+ except ValueError:
+ raise ValueError("Invalid channel list: {}".format(args.channels))
+ for chan in channels:
+ assert chan in range(available_chans), \
+ "ERROR: Invalid channel: {}. Should be in 0..{}.".format(
+ chan, available_chans)
+ print("=== Calibrating for channels:", ", ".join([str(x) for x in channels]))
+ available_ants = getattr(usrp, 'get_{}_antennas'.format(args.dir))()
+ if args.antenna == '*':
+ invalid_antennas = ('CAL', 'LOCAL')
+ antennas = [x for x in available_ants if x not in invalid_antennas]
+ else:
+ try:
+ antennas = args.antenna.split(',')
+ except ValueError:
+ raise ValueError("Invalid antenna list: {}".format(args.antenna))
+ for ant in antennas:
+ assert ant in available_ants, \
+ "Invalid antenna: {}. Should be in {}.".format(
+ ant, available_ants)
+ print("=== Calibrating for antennas:", ", ".join([str(x) for x in antennas]))
+ rate = args.rate or default_rate
+ getattr(usrp, 'set_{}_rate'.format(args.dir))(rate)
+ actual_rate = getattr(usrp, 'get_{}_rate'.format(args.dir))()
+ print("=== Requested sampling rate: {} Msps, actual rate: {} Msps"
+ .format(rate/1e6, actual_rate/1e6))
+ if args.dir == 'tx' and abs(args.tone_freq) > actual_rate:
+ raise ValueError(
+ "The TX tone frequency offset of {} kHz is greater than the sampling rate."
+ .format(args.tone_freq / 1e3))
+ return channels, antennas, actual_rate
+
+
+def init_results(pickle_file):
+ """
+ Initialize results from pickle file, or empty dict
+ """
+ if pickle_file is None:
+ return {}
+ with open(pickle_file, 'rb') as results_file:
+ return pickle.load(results_file)
+
+class CalRunner:
+ """
+ Executor of the calibration routines.
+ """
+ def __init__(self, usrp, usrp_cal, meas_dev, args):
+ self.usrp = usrp
+ self.usrp_cal = usrp_cal
+ self.meas_dev = meas_dev
+ self.dir = args.dir
+ self.tone_offset = args.tone_freq if args.dir == 'tx' else 0.0
+ self.lo_offset = args.lo_offset if args.lo_offset else usrp_cal.lo_offset
+ if self.lo_offset:
+ print("=== Using USRP LO offset: {:.2f} MHz"
+ .format(self.lo_offset / 1e6))
+
+ def run(self, chan, freq):
+ """
+ Run all cal steps for a single frequency
+ """
+ print("=== Running calibration at frequency {:.3f} MHz...".format(freq / 1e6))
+ tune_req = uhd.types.TuneRequest(freq, self.lo_offset)
+ getattr(self.usrp, 'set_{}_freq'.format(self.dir))(tune_req, chan)
+ actual_freq = getattr(self.usrp, 'get_{}_freq'.format(self.dir))(chan)
+ if abs(actual_freq - freq) > 1.0:
+ print("WARNING: Frequency was coerced from {:.2f} MHz to {:.2f} MHz!"
+ .format(freq / 1e6, actual_freq / 1e6))
+ self.meas_dev.set_frequency(actual_freq + self.tone_offset)
+ getattr(self.usrp_cal, 'run_{}_cal'.format(self.dir))(freq)
+
+def main():
+ """Go, go, go!"""
+ args = parse_args()
+ print("=== Detecting USRP...")
+ usrp = uhd.usrp.MultiUSRP(args.args)
+ print("=== Measurement direction:", args.dir)
+ print("=== Initializing measurement device...")
+ meas_dev = uhd.usrp.cal.get_meas_device(args.dir, args.meas_dev, args.meas_option)
+ meas_dev.power_offset = args.attenuation
+ # If we're transmitting, then we need to factor in the "attenuation" from us
+ # not transmitting at full scale
+ if args.dir == 'tx':
+ meas_dev.power_offset -= 20 * math.log10(args.amplitude)
+ print("=== Initializing USRP calibration object...")
+ usrp_cal = uhd.usrp.cal.get_usrp_calibrator(
+ usrp, meas_dev, args.dir,
+ gain_step=args.gain_step,
+ )
+ channels, antennas, rate = sanitize_args(usrp, args, usrp_cal.default_rate)
+ results = init_results(args.load)
+ usrp_cal.init(
+ rate=rate,
+ tone_freq=args.tone_freq,
+ amplitude=args.amplitude,
+ )
+ print("=== Launching calibration...")
+ cal_runner = CalRunner(usrp, usrp_cal, meas_dev, args)
+ for chan in channels:
+ if chan not in results:
+ results[chan] = {}
+ for ant in antennas:
+ if ant in results[chan]:
+ print("=== Using pickled data for channel {}, antenna {}."
+ .format(chan, ant))
+ continue
+ print("=== Running calibration for channel {}, antenna {}."
+ .format(chan, ant))
+ # Set up all the objects
+ getattr(usrp, 'set_{}_antenna'.format(args.dir))(ant, chan)
+ meas_dev.update_port(chan, ant)
+ usrp_cal.update_port(chan, ant)
+ freqs = usrp_cal.init_frequencies(args.start, args.stop, args.step)
+ usrp_cal.start() # This will activate siggen
+ # Now calibrate
+ for freq in freqs:
+ try:
+ cal_runner.run(chan, freq)
+ except RuntimeError as ex:
+ print("ERROR: Stopping calibration due to exception: {}"
+ .format(str(ex)))
+ usrp_cal.stop()
+ return 1
+ # Store results for pickling and shut down for next antenna port
+ results[chan][ant] = usrp_cal.results
+ usrp_cal.stop() # This will deactivate siggen and store the data
+ if args.store:
+ print("=== Storing pickled calibration data to {}...".format(args.store))
+ with open(args.store, 'wb') as results_file:
+ pickle.dump(results, results_file)
+ return 0
+
+if __name__ == "__main__":
+ try:
+ sys.exit(main())
+ except (RuntimeError, ValueError) as ex:
+ print("ERROR:", str(ex))
+ sys.exit(1)