diff options
author | Andrej Rode <andrej.rode@ettus.com> | 2016-10-24 10:25:42 -0700 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2017-05-26 16:01:37 -0700 |
commit | 76e9e6393fe1d5a0ccc9e05deb431694921850ec (patch) | |
tree | b7f1097a04fd7a7d0a423a353fbeba037746b782 /tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py | |
parent | 06ff34eaa9e147b11d2698308fa91a5260fee2d2 (diff) | |
download | uhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.tar.gz uhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.tar.bz2 uhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.zip |
gr-usrptest: Initial creation
- new OOT-blocks: phase_calc_ccf hier-block, measurement_sink_f
- new python submodules: flowgraphs, functions, rts_tests
- new apps: usrp_phasealignment.py - cmdline example for manual testing
OOT-Blocks:
- phase_calc_ccf takes two complex input streams and conjugate
multiplys them and extracts the phase from the result and converts
it to degree scale
- measurement_sink_f: takes a float input stream and calculates average and stddev for a
specified number of samples. Start of a measurement is invoked by a
call of start_run() on the block. After a couple of runs average and
stddev can be extracted.
Python modules:
- flowgrahps contains reconfigurable flowgraphs for different GNU
Radio RF test cases
- functions contains functions which are used in different apps/RTS
scripts
- rts_tests contains test cases which are meant to be executed from
the RTS system. Depends on TinyDB, labview_automation
Apps:
- usrp_phasealignment.py is an example how to use the underlying
flowgraph to measure phase differences. Commandline arguments of
uhd_app can be used and several additional arguments can/have to be
specified. Runs a phase difference measurement --runs number of times and averages
phase difference over --duration seconds. Between measurements USRP
sinks are retuned to random frequencies in daughterboard range.
Results are displayed using motherboard serial and daughterboard
serial
Diffstat (limited to 'tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py')
-rw-r--r-- | tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py b/tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py new file mode 100644 index 000000000..f120e2421 --- /dev/null +++ b/tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python2 + +from gnuradio import gr +from gnuradio import uhd +from gnuradio import analog +from usrptest import phase_calc_ccf, measurement_sink_f +from usrptest.functions import log_level +from random import uniform +from ast import literal_eval +import copy +import logging +import sys + + +class phasealignment_fg(gr.top_block): + def __init__(self, uhd_app): + gr.top_block.__init__(self, "Calculate dphi for all USRPs (Rx only)") + ############################## + # Block dicts + ############################## + self.log = logging.getLogger(__name__) + [self.log.removeHandler(h) for h in self.log.handlers] + self.log.addHandler(logging.StreamHandler(sys.stdout)) + self.log.setLevel(log_level(uhd_app.args.log_level)) + self.rx_streams = list() + self.phase_diff_calc = list() + self.measurement_sink = list() + self.uhd_app = copy.copy(uhd_app) + self.tx_app = copy.copy(uhd_app) + self.samp_rate = uhd_app.args.samp_rate + # Create all devices specified in --receiver + # Create all remaining blocks and connect devices to the first port and + # sink + self.uhd_app.args.num_chan = len(self.uhd_app.args.channels) + self.log.info('setting up usrp....') + ############################## + # Setup RX + ############################## + self.log.debug("RX-Setup with args: {}".format(self.uhd_app.args)) + self.uhd_app.setup_usrp(uhd.usrp_source, self.uhd_app.args) + if self.uhd_app.args.measurement_setup is not None: + self.measurement_channels = self.uhd_app.args.measurement_setup.strip( + ).split(',') + # make sure every channels is listed in measurement_channels at least once + if len(set( + self.measurement_channels)) != self.uhd_apps.args.num_chan: + self.uhd_app.vprint( + "[{prefix}] Number of measurement channels has to be the number of used channels." + ) + self.uhd_app.exit(1) + self.measurement_channels = [self.uhd_app.args.channels.index(m) for m in self.measurement_channels] + else: + self.measurement_channels = range(self.uhd_app.args.num_chan) + + self.measurement_channels_names = list() + for chan in self.measurement_channels: + usrp_info = self.uhd_app.usrp.get_usrp_info(chan) + self.measurement_channels_names.append("_".join( + [usrp_info['mboard_serial'], usrp_info['rx_serial']])) + + #Connect channels to first port of d_phi_calc_block and to measurement_sink + for num, chan in enumerate(self.measurement_channels[:-1]): + self.phase_diff_calc.append(phase_calc_ccf()) + self.measurement_sink.append( + measurement_sink_f( + int(self.uhd_app.args.samp_rate * + self.uhd_app.args.duration), self.uhd_app.args.runs)) + self.connect((self.uhd_app.usrp, chan), + (self.phase_diff_calc[num], 0)) + self.connect((self.phase_diff_calc[num], 0), + (self.measurement_sink[num], 0)) + # Connect devices to second port of d_phi_block + for num, chan in enumerate(self.measurement_channels[1:]): + self.connect((self.uhd_app.usrp, chan), + (self.phase_diff_calc[num], 1)) + ############################## + # Setup TX + ############################## + if self.uhd_app.args.tx_channels is not None: + self.tx_app.args.antenna = self.tx_app.args.tx_antenna + self.tx_app.args.channels = [ + int(chan.strip()) + for chan in self.tx_app.args.tx_channels.split(',') + ] + self.tx_app.usrp = None + self.log.debug("TX-Setup with args: {}".format(self.tx_app.args)) + self.tx_app.setup_usrp(uhd.usrp_sink, self.tx_app.args) + self.siggen = analog.sig_source_c(self.samp_rate, + analog.GR_COS_WAVE, + self.tx_app.args.tx_offset, 1.0) + for chan in range(len(self.tx_app.channels)): + self.connect((self.siggen, 0), (self.tx_app.usrp, chan)) + + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + + def retune_frequency(self, band_num=1, bands=1): + ref_chan = self.uhd_app.channels[0] + freq_range = literal_eval( + self.uhd_app.usrp.get_freq_range(ref_chan).__str__( + )) # returns tuple with (start_freq, end_freq, step) + + if bands > 1: + bw = (freq_range[1] - freq_range[0])/bands + freq_range = list(freq_range) + freq_range[0] = freq_range[0] + ((band_num-1) % bands)*bw + freq_range[1] = freq_range[0] + bw + + retune_freq = uniform(freq_range[0], freq_range[1]) + self.log.info('tune all channels to: {:f} MHz'.format(retune_freq / + 1e6)) + self.uhd_app.set_freq(retune_freq) + self.log.info('tune all channels to: {:f} MHz'.format( + self.uhd_app.args.freq / 1e6)) + self.uhd_app.set_freq(self.uhd_app.args.freq) |