diff options
-rw-r--r-- | host/python/uhd/dsp/__init__.py | 10 | ||||
-rw-r--r-- | host/python/uhd/dsp/signals.py | 74 |
2 files changed, 84 insertions, 0 deletions
diff --git a/host/python/uhd/dsp/__init__.py b/host/python/uhd/dsp/__init__.py new file mode 100644 index 000000000..e15a2a5eb --- /dev/null +++ b/host/python/uhd/dsp/__init__.py @@ -0,0 +1,10 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Python UHD Module: DSP sub-module +""" + +from . import signals diff --git a/host/python/uhd/dsp/signals.py b/host/python/uhd/dsp/signals.py new file mode 100644 index 000000000..51e814dc0 --- /dev/null +++ b/host/python/uhd/dsp/signals.py @@ -0,0 +1,74 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +Utilities for generating/analyzing signals +""" + +import math +import numpy +import uhd + +def get_continuous_tone(rate, freq, ampl, desired_size=None, max_size=None): + """ + Return a buffer containing a complex tone at frequency freq. The tone is + continuous, that is, repeating this signal will produce a continuous phase + sinusoid. + The buffer will try and approximate desired_size in length. If it is not + possible to create a buffer smaller than max_size, an exception is thrown. + + Arguments: + rate -- Sampling rate in Hz. + freq -- Tone frequency in Hz + ampl -- Amplitude + desired_size -- Number of samples ideally in returned buffer + max_size -- Number of samples maximally in returned buffer + """ + desired_size = desired_size or 1.0 * rate # About one second worth of data + max_size = max_size or 100e6 + assert rate > freq + rate_int = int(rate) + freq_int = int(freq) + gcd = math.gcd(rate_int, freq_int) + rate_int = rate_int / gcd + freq_int = freq_int / gcd + length = max(freq_int * rate_int, 1) # freq may be zero + tone = \ + numpy.exp(1j * 2 * numpy.pi * (freq/rate) * numpy.arange(length), dtype=numpy.complex64) \ + * ampl + if length < desired_size: + tone = numpy.tile(tone, int(desired_size // length)) + if length > max_size: + raise ValueError("Cannot create a TX buffer! Rate/Freq ratio is too odd.") + return tone + +def get_power_dbfs(signal): + """ + Return the power in dBFS for a digital signal, where fullscale is considered + 1.0. + + A sinusoid with amplitude 1.0 will thus return 0.0 (dBFS). + """ + return 10 * numpy.log10(numpy.var(signal)) + +def get_usrp_power(streamer, num_samps=1e6, chan=0): + """ + Return the measured input power in dBFS + + The return value is a list of dBFS power values, one per channel. + """ + recv_buffer = numpy.zeros( + (streamer.get_num_channels(), num_samps), dtype=numpy.complex64) + metadata = uhd.types.RXMetadata() + stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done) + stream_cmd.num_samps = num_samps + stream_cmd.stream_now = True + streamer.issue_stream_cmd(stream_cmd) + # Pass in long timeout, so we can rx the entire buffer in one go + samps_recvd = streamer.recv(recv_buffer, metadata, 5.0) + if samps_recvd != num_samps: + raise RuntimeError( + "ERROR! get_usrp_power(): Did not receive the correct number of samples!") + return get_power_dbfs(recv_buffer[chan]) |