From a603c97d06bf26271141f875a55c3c76f26057c6 Mon Sep 17 00:00:00 2001 From: Paul David Date: Thu, 29 Jun 2017 16:15:58 -0400 Subject: python: Added curses frequency plot example --- host/examples/python/CMakeLists.txt | 1 + host/examples/python/curses_fft.py | 152 ++++++++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100755 host/examples/python/curses_fft.py diff --git a/host/examples/python/CMakeLists.txt b/host/examples/python/CMakeLists.txt index 4b75f57bd..669c80203 100644 --- a/host/examples/python/CMakeLists.txt +++ b/host/examples/python/CMakeLists.txt @@ -7,6 +7,7 @@ SET(python_examples rx_to_file.py tx_waveforms.py + curses_fft.py ) UHD_INSTALL(PROGRAMS ${python_examples} DESTINATION ${PKG_LIB_DIR}/examples/python COMPONENT examples) diff --git a/host/examples/python/curses_fft.py b/host/examples/python/curses_fft.py new file mode 100755 index 000000000..7a447cc67 --- /dev/null +++ b/host/examples/python/curses_fft.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python +# +# Copyright 2017-2018 Ettus Research, a National Instruments Company +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Curses FFT example using Python API +""" + +import argparse +import curses as cs +import numpy as np +import uhd + + +def parse_args(): + """Parse the command line arguments""" + parser = argparse.ArgumentParser() + parser.add_argument("-a", "--args", default="", type=str) + parser.add_argument("-f", "--freq", type=float, required=True) + parser.add_argument("-r", "--rate", default=1e6, type=float) + parser.add_argument("-g", "--gain", type=int, default=10) + parser.add_argument("-c", "--channel", type=int, default=0) + parser.add_argument("-n", "--nsamps", type=int, default=100000) + parser.add_argument("--dyn", type=int, default=60) + parser.add_argument("--ref", type=int, default=0) + return parser.parse_args() + + +def psd(nfft, samples): + """Return the power spectral density of `samples`""" + window = np.hamming(nfft) + result = np.multiply(window, samples) + result = np.fft.fftshift(np.fft.fft(result, nfft)) + result = np.square(np.abs(result)) + result = np.nan_to_num(10.0 * np.log10(result)) + result = np.abs(result) + return result + + +def clip(minval, maxval, value): + """Clip the value between a and b""" + return min(minval, max(maxval, value)) + + +def main(): + """Create Curses display of FFT""" + args = parse_args() + usrp = uhd.usrp.MultiUSRP(args.args) + + # Set the USRP rate, freq, and gain + usrp.set_rx_rate(args.rate, args.channel) + usrp.set_rx_freq(uhd.types.TuneRequest(args.freq), args.channel) + usrp.set_rx_gain(args.gain, args.channel) + + # Initialize the curses screen + screen = cs.initscr() + cs.curs_set(0) + cs.noecho() + cs.cbreak() + screen.keypad(1) + height, width = screen.getmaxyx() + + # Create a pad for the y-axis + y_axis_width = 10 + y_axis = cs.newwin(height, y_axis_width, 0, 0) + + # Create the buffer to recv samples + num_samps = max(args.nsamps, width) + samples = np.empty((1, num_samps), dtype=np.complex64) + + st_args = uhd.usrp.StreamArgs("fc32", "sc16") + st_args.channels = [args.channel] + + metadata = uhd.types.RXMetadata() + streamer = usrp.get_rx_stream(st_args) + buffer_samps = streamer.get_max_num_samps() + recv_buffer = np.zeros((1, buffer_samps), dtype=np.complex64) + + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) + stream_cmd.stream_now = True + streamer.issue_stream_cmd(stream_cmd) + + db_step = float(args.dyn) / (height - 1.0) + db_start = db_step * int((args.ref - args.dyn) / db_step) + db_stop = db_step * int(args.ref / db_step) + + try: + while True: + # Resize the frequency plot on screen resize + screen.clear() + if cs.is_term_resized(height, width): + height, width = screen.getmaxyx() + cs.resizeterm(height, width) + + db_step = float(args.dyn) / (height - 1.0) + db_start = db_step * int((args.ref - args.dyn) / db_step) + db_stop = db_step * int(args.ref / db_step) + + y_axis.clear() + + # Create the vertical (dBfs) axis + y_axis.addstr(0, 1, "{:> 6.2f} |-".format(db_stop)) + for i in range(1, height - 1): + label = db_stop - db_step * i + y_axis.addstr(i, 1, "{:> 6.2f} |-".format(label)) + try: + y_axis.addstr(height - 1, 1, "{:> 6.2f} |-".format(db_start)) + except cs.error: + pass + y_axis.refresh() + + # Receive the samples + recv_samps = 0 + while recv_samps < num_samps: + samps = streamer.recv(recv_buffer, metadata) + + if metadata.error_code != uhd.types.RXMetadataErrorCode.none: + print(metadata.strerror()) + if samps: + real_samps = min(num_samps - recv_samps, samps) + samples[:, recv_samps:recv_samps + real_samps] = recv_buffer[:, 0:real_samps] + recv_samps += real_samps + + # Get the power in each bin + bins = psd(width, samples[args.channel][0:width]) + + for i in range(y_axis_width, width): + vertical_slot = clip(height, 0, np.int(bins[i] / db_step)) + try: + for j in range(vertical_slot, height): + screen.addch(j, i, '*') + except cs.error: + pass + screen.refresh() + + except KeyboardInterrupt: + pass + + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) + streamer.issue_stream_cmd(stream_cmd) + + cs.curs_set(1) + cs.nocbreak() + screen.keypad(0) + cs.echo() + cs.endwin() + + +if __name__ == "__main__": + main() -- cgit v1.2.3