1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
#!/usr/bin/env python3
#
# Copyright 2017-2018 Ettus Research, a National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
Curses FFT example using Python API
"""
import argparse
import curses as cs
import numpy as np
import uhd
def parse_args():
"""Parse the command line arguments"""
parser = argparse.ArgumentParser()
parser.add_argument("-a", "--args", default="", type=str)
parser.add_argument("-f", "--freq", type=float, required=True)
parser.add_argument("-r", "--rate", default=1e6, type=float)
parser.add_argument("-g", "--gain", type=int, default=10)
parser.add_argument("-c", "--channel", type=int, default=0)
parser.add_argument("-n", "--nsamps", type=int, default=100000)
parser.add_argument("--dyn", type=int, default=60)
parser.add_argument("--ref", type=int, default=0)
return parser.parse_args()
def psd(nfft, samples):
"""Return the power spectral density of `samples`"""
window = np.hamming(nfft)
result = np.multiply(window, samples)
result = np.fft.fftshift(np.fft.fft(result, nfft))
result = np.square(np.abs(result))
result = np.nan_to_num(10.0 * np.log10(result))
result = np.abs(result)
return result
def clip(minval, maxval, value):
"""Clip the value between a and b"""
return min(minval, max(maxval, value))
def main():
"""Create Curses display of FFT"""
args = parse_args()
usrp = uhd.usrp.MultiUSRP(args.args)
# Set the USRP rate, freq, and gain
usrp.set_rx_rate(args.rate, args.channel)
usrp.set_rx_freq(uhd.types.TuneRequest(args.freq), args.channel)
usrp.set_rx_gain(args.gain, args.channel)
# Initialize the curses screen
screen = cs.initscr()
cs.curs_set(0)
cs.noecho()
cs.cbreak()
screen.keypad(1)
height, width = screen.getmaxyx()
# Create a pad for the y-axis
y_axis_width = 10
y_axis = cs.newwin(height, y_axis_width, 0, 0)
# Create the buffer to recv samples
num_samps = max(args.nsamps, width)
samples = np.empty((1, num_samps), dtype=np.complex64)
st_args = uhd.usrp.StreamArgs("fc32", "sc16")
st_args.channels = [args.channel]
metadata = uhd.types.RXMetadata()
streamer = usrp.get_rx_stream(st_args)
buffer_samps = streamer.get_max_num_samps()
recv_buffer = np.zeros((1, buffer_samps), dtype=np.complex64)
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
stream_cmd.stream_now = True
streamer.issue_stream_cmd(stream_cmd)
db_step = float(args.dyn) / (height - 1.0)
db_start = db_step * int((args.ref - args.dyn) / db_step)
db_stop = db_step * int(args.ref / db_step)
try:
while True:
# Resize the frequency plot on screen resize
screen.clear()
if cs.is_term_resized(height, width):
height, width = screen.getmaxyx()
cs.resizeterm(height, width)
db_step = float(args.dyn) / (height - 1.0)
db_start = db_step * int((args.ref - args.dyn) / db_step)
db_stop = db_step * int(args.ref / db_step)
y_axis.clear()
# Create the vertical (dBfs) axis
y_axis.addstr(0, 1, "{:> 6.2f} |-".format(db_stop))
for i in range(1, height - 1):
label = db_stop - db_step * i
y_axis.addstr(i, 1, "{:> 6.2f} |-".format(label))
try:
y_axis.addstr(height - 1, 1, "{:> 6.2f} |-".format(db_start))
except cs.error:
pass
y_axis.refresh()
# Receive the samples
recv_samps = 0
while recv_samps < num_samps:
samps = streamer.recv(recv_buffer, metadata)
if metadata.error_code != uhd.types.RXMetadataErrorCode.none:
print(metadata.strerror())
if samps:
real_samps = min(num_samps - recv_samps, samps)
samples[:, recv_samps:recv_samps + real_samps] = recv_buffer[:, 0:real_samps]
recv_samps += real_samps
# Get the power in each bin
bins = psd(width, samples[args.channel][0:width])
for i in range(y_axis_width, width):
vertical_slot = clip(height, 0, np.int(bins[i] / db_step))
try:
for j in range(vertical_slot, height):
screen.addch(j, i, '*')
except cs.error:
pass
screen.refresh()
except KeyboardInterrupt:
pass
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
streamer.issue_stream_cmd(stream_cmd)
cs.curs_set(1)
cs.nocbreak()
screen.keypad(0)
cs.echo()
cs.endwin()
if __name__ == "__main__":
main()
|