aboutsummaryrefslogtreecommitdiffstats
path: root/host/examples/python/usrp_power_meter.py
diff options
context:
space:
mode:
Diffstat (limited to 'host/examples/python/usrp_power_meter.py')
-rw-r--r--host/examples/python/usrp_power_meter.py131
1 files changed, 131 insertions, 0 deletions
diff --git a/host/examples/python/usrp_power_meter.py b/host/examples/python/usrp_power_meter.py
new file mode 100644
index 000000000..7cd3dd3f6
--- /dev/null
+++ b/host/examples/python/usrp_power_meter.py
@@ -0,0 +1,131 @@
+#!/usr/bin/env python3
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+Use a calibrated USRP as a power meter
+"""
+
+import sys
+import signal
+import argparse
+import uhd
+
+def parse_args():
+ """Parse the command line arguments"""
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-a", "--args", default="",
+ help="USRP Device Args")
+ parser.add_argument("-f", "--freq", type=float, required=True,
+ help="Center Frequency")
+ parser.add_argument("-o", "--lo-offset", type=float, default=0.0,
+ help="Optional LO offset")
+ parser.add_argument("-c", "--channel", type=int, default=0,
+ help="USRP RX Channel Index")
+ parser.add_argument("-t", "--antenna",
+ help="USRP RX Antenna")
+ parser.add_argument("-r", "--rate", default=1e6, type=float,
+ help="Sampling Rate")
+ parser.add_argument("-b", "--bandwidth", type=float,
+ help="Analog filter bandwidth (if supported)")
+ parser.add_argument("-l", "--ref-level", type=float, default=-15,
+ help="RX reference power level. "
+ "This should be higher than the expected power.")
+ parser.add_argument("-n", "--samps-per-est", type=float, default=1e6,
+ help="Samples per estimate.")
+ parser.add_argument("--mode", choices=['one-shot', 'continuous'], default='one-shot',
+ help="Measure once, or keep measuring until Ctrl-C is pressed.")
+ return parser.parse_args()
+
+def get_streamer(usrp, chan):
+ """
+ Return an RX streamer with fc32 output
+ """
+ stream_args = uhd.usrp.StreamArgs('fc32', 'sc16')
+ stream_args.channels = [chan]
+ return usrp.get_rx_stream(stream_args)
+
+def setup_device(usrp, args, chan):
+ """
+ Apply the settings from args to the device
+ """
+ print("Requesting RX rate of {} Msps...".format(args.rate / 1e6), end='')
+ usrp.set_rx_rate(args.rate, chan)
+ if abs(usrp.get_rx_rate(chan) - args.rate) > 1.0:
+ print("ALMOST. Actual rate: {} Msps"
+ .format(usrp.get_rx_rate(chan) / 1e6))
+ else:
+ print("OK")
+ if args.antenna:
+ print("Setting RX antenna to `{}'...".format(args.antenna), end='')
+ usrp.set_rx_antenna(args.antenna, chan)
+ print("OK")
+ print("Requesting RX frequency of {} MHz...".format(args.freq / 1e6), end='')
+ tr = uhd.types.TuneRequest(args.freq, args.lo_offset)
+ usrp.set_rx_freq(tr, chan)
+ if abs(usrp.get_rx_freq(chan) - args.freq) > 1.0:
+ print("ALMOST. Actual frequency: {} MHz".format(usrp.get_rx_freq(chan) / 1e6))
+ else:
+ print("OK")
+ print("Requesting RX power reference level of {:.2f} dBm..."
+ .format(args.ref_level), end='')
+ usrp.set_rx_power_reference(args.ref_level, chan)
+ ref_level = usrp.get_rx_power_reference(chan)
+ if abs(ref_level - args.ref_level) > 1.0:
+ print("ALMOST. Actual ref level: {:.2f} dBm".format(ref_level))
+ else:
+ print("OK")
+ if args.bandwidth:
+ print("Requesting analog RX bandwidth of {} Msps..."
+ .format(args.bandwidth), end='')
+ usrp.set_rx_bandwidth(args.bandwidth, chan)
+ if abs(usrp.get_rx_bandwidth(chan) - args.bandwidth) > 1.0:
+ print("ALMOST. Actual bandwidth: {} MHz"
+ .format(usrp.get_rx_bandwidth(chan) / 1e6))
+ else:
+ print("OK")
+ return ref_level
+
+RUN = True
+
+def main():
+ """
+ gogogo
+ """
+ args = parse_args()
+ usrp = uhd.usrp.MultiUSRP(args.args)
+ chan = args.channel
+ if chan > usrp.get_rx_num_channels():
+ print("ERROR: Invalid channel selected: {} (only {} channels available!)"
+ .format(chan, usrp.get_rx_num_channels()))
+ return False
+ print("Using channel: {}".format(chan))
+ if not usrp.has_rx_power_reference(chan):
+ print("ERROR: This device is not calibrated for RX!")
+ return False
+ ref_level = setup_device(usrp, args, chan)
+ streamer = get_streamer(usrp, chan)
+ if args.mode == 'continuous':
+ def handle_sigint(_sig, _frame):
+ print("Caught Ctrl-C, exiting...")
+ global RUN
+ RUN = False
+ signal.signal(signal.SIGINT, handle_sigint)
+ while RUN:
+ try:
+ power_dbfs = uhd.dsp.signals.get_usrp_power(
+ streamer, num_samps=int(args.samps_per_est), chan=chan)
+ except RuntimeError:
+ # This is a hack b/c the signal handler is not gracefully handling
+ # SIGINT
+ break
+ power_dbm = power_dbfs + ref_level
+ print("Received power: {:+6.2f} dBm".format(power_dbm))
+ if args.mode == 'one-shot':
+ break
+ return True
+
+if __name__ == "__main__":
+ sys.exit(not main())