diff options
author | Andrej Rode <andrej.rode@ettus.com> | 2016-10-24 10:25:42 -0700 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2017-05-26 16:01:37 -0700 |
commit | 76e9e6393fe1d5a0ccc9e05deb431694921850ec (patch) | |
tree | b7f1097a04fd7a7d0a423a353fbeba037746b782 /tools/gr-usrptest/python | |
parent | 06ff34eaa9e147b11d2698308fa91a5260fee2d2 (diff) | |
download | uhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.tar.gz uhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.tar.bz2 uhd-76e9e6393fe1d5a0ccc9e05deb431694921850ec.zip |
gr-usrptest: Initial creation
- new OOT-blocks: phase_calc_ccf hier-block, measurement_sink_f
- new python submodules: flowgraphs, functions, rts_tests
- new apps: usrp_phasealignment.py - cmdline example for manual testing
OOT-Blocks:
- phase_calc_ccf takes two complex input streams and conjugate
multiplys them and extracts the phase from the result and converts
it to degree scale
- measurement_sink_f: takes a float input stream and calculates average and stddev for a
specified number of samples. Start of a measurement is invoked by a
call of start_run() on the block. After a couple of runs average and
stddev can be extracted.
Python modules:
- flowgrahps contains reconfigurable flowgraphs for different GNU
Radio RF test cases
- functions contains functions which are used in different apps/RTS
scripts
- rts_tests contains test cases which are meant to be executed from
the RTS system. Depends on TinyDB, labview_automation
Apps:
- usrp_phasealignment.py is an example how to use the underlying
flowgraph to measure phase differences. Commandline arguments of
uhd_app can be used and several additional arguments can/have to be
specified. Runs a phase difference measurement --runs number of times and averages
phase difference over --duration seconds. Between measurements USRP
sinks are retuned to random frequencies in daughterboard range.
Results are displayed using motherboard serial and daughterboard
serial
Diffstat (limited to 'tools/gr-usrptest/python')
-rw-r--r-- | tools/gr-usrptest/python/CMakeLists.txt | 10 | ||||
-rw-r--r-- | tools/gr-usrptest/python/flowgraphs/CMakeLists.txt | 28 | ||||
-rw-r--r-- | tools/gr-usrptest/python/flowgraphs/__init__.py | 14 | ||||
-rw-r--r-- | tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py | 119 | ||||
-rw-r--r-- | tools/gr-usrptest/python/flowgraphs/selftest_fg.py | 122 | ||||
-rw-r--r-- | tools/gr-usrptest/python/functions.py | 133 | ||||
-rw-r--r-- | tools/gr-usrptest/python/phase_calc_ccf.py | 47 | ||||
-rwxr-xr-x | tools/gr-usrptest/python/qa_measurement_sink_f.py | 41 | ||||
-rw-r--r-- | tools/gr-usrptest/python/rts_tests/CMakeLists.txt | 27 | ||||
-rw-r--r-- | tools/gr-usrptest/python/rts_tests/__init__.py | 14 | ||||
-rw-r--r-- | tools/gr-usrptest/python/rts_tests/test_phasealignment.py | 121 |
11 files changed, 674 insertions, 2 deletions
diff --git a/tools/gr-usrptest/python/CMakeLists.txt b/tools/gr-usrptest/python/CMakeLists.txt index 4afda600e..ba3d12394 100644 --- a/tools/gr-usrptest/python/CMakeLists.txt +++ b/tools/gr-usrptest/python/CMakeLists.txt @@ -31,15 +31,21 @@ endif() GR_PYTHON_INSTALL( FILES __init__.py - parsers.py - selftest_fg.py + functions.py phase_calc_ccf.py DESTINATION ${GR_PYTHON_DIR}/usrptest ) +add_subdirectory(flowgraphs) +add_subdirectory(labview_control) +add_subdirectory(rts_tests) + ######################################################################## # Handle the unit tests ######################################################################## include(GrTest) +GR_ADD_TEST(qa_phasealignment ${PYTHON_EXECUTABLE} + ${CMAKE_CURRENT_SOURCE_DIR}/qa_phasealignment.py) set(GR_TEST_TARGET_DEPS gnuradio-usrptest) set(GR_TEST_PYTHON_DIRS ${CMAKE_BINARY_DIR}/swig) +GR_ADD_TEST(qa_measurement_sink_f ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/qa_measurement_sink_f.py) diff --git a/tools/gr-usrptest/python/flowgraphs/CMakeLists.txt b/tools/gr-usrptest/python/flowgraphs/CMakeLists.txt new file mode 100644 index 000000000..7ea94b505 --- /dev/null +++ b/tools/gr-usrptest/python/flowgraphs/CMakeLists.txt @@ -0,0 +1,28 @@ +# Copyright 2011 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. + +######################################################################## +# Install python sources +######################################################################## +GR_PYTHON_INSTALL( + FILES + __init__.py + selftest_fg.py + phasealignment_fg.py DESTINATION ${GR_PYTHON_DIR}/usrptest/flowgraphs +) diff --git a/tools/gr-usrptest/python/flowgraphs/__init__.py b/tools/gr-usrptest/python/flowgraphs/__init__.py new file mode 100644 index 000000000..048e5638a --- /dev/null +++ b/tools/gr-usrptest/python/flowgraphs/__init__.py @@ -0,0 +1,14 @@ +""" +usrptest.flowgraphs +====================================== + +Contents +-------- + +Subpackages +----------- +:: + +The existance of the file turns the folder into a Python module. + +""" diff --git a/tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py b/tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py new file mode 100644 index 000000000..f120e2421 --- /dev/null +++ b/tools/gr-usrptest/python/flowgraphs/phasealignment_fg.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python2 + +from gnuradio import gr +from gnuradio import uhd +from gnuradio import analog +from usrptest import phase_calc_ccf, measurement_sink_f +from usrptest.functions import log_level +from random import uniform +from ast import literal_eval +import copy +import logging +import sys + + +class phasealignment_fg(gr.top_block): + def __init__(self, uhd_app): + gr.top_block.__init__(self, "Calculate dphi for all USRPs (Rx only)") + ############################## + # Block dicts + ############################## + self.log = logging.getLogger(__name__) + [self.log.removeHandler(h) for h in self.log.handlers] + self.log.addHandler(logging.StreamHandler(sys.stdout)) + self.log.setLevel(log_level(uhd_app.args.log_level)) + self.rx_streams = list() + self.phase_diff_calc = list() + self.measurement_sink = list() + self.uhd_app = copy.copy(uhd_app) + self.tx_app = copy.copy(uhd_app) + self.samp_rate = uhd_app.args.samp_rate + # Create all devices specified in --receiver + # Create all remaining blocks and connect devices to the first port and + # sink + self.uhd_app.args.num_chan = len(self.uhd_app.args.channels) + self.log.info('setting up usrp....') + ############################## + # Setup RX + ############################## + self.log.debug("RX-Setup with args: {}".format(self.uhd_app.args)) + self.uhd_app.setup_usrp(uhd.usrp_source, self.uhd_app.args) + if self.uhd_app.args.measurement_setup is not None: + self.measurement_channels = self.uhd_app.args.measurement_setup.strip( + ).split(',') + # make sure every channels is listed in measurement_channels at least once + if len(set( + self.measurement_channels)) != self.uhd_apps.args.num_chan: + self.uhd_app.vprint( + "[{prefix}] Number of measurement channels has to be the number of used channels." + ) + self.uhd_app.exit(1) + self.measurement_channels = [self.uhd_app.args.channels.index(m) for m in self.measurement_channels] + else: + self.measurement_channels = range(self.uhd_app.args.num_chan) + + self.measurement_channels_names = list() + for chan in self.measurement_channels: + usrp_info = self.uhd_app.usrp.get_usrp_info(chan) + self.measurement_channels_names.append("_".join( + [usrp_info['mboard_serial'], usrp_info['rx_serial']])) + + #Connect channels to first port of d_phi_calc_block and to measurement_sink + for num, chan in enumerate(self.measurement_channels[:-1]): + self.phase_diff_calc.append(phase_calc_ccf()) + self.measurement_sink.append( + measurement_sink_f( + int(self.uhd_app.args.samp_rate * + self.uhd_app.args.duration), self.uhd_app.args.runs)) + self.connect((self.uhd_app.usrp, chan), + (self.phase_diff_calc[num], 0)) + self.connect((self.phase_diff_calc[num], 0), + (self.measurement_sink[num], 0)) + # Connect devices to second port of d_phi_block + for num, chan in enumerate(self.measurement_channels[1:]): + self.connect((self.uhd_app.usrp, chan), + (self.phase_diff_calc[num], 1)) + ############################## + # Setup TX + ############################## + if self.uhd_app.args.tx_channels is not None: + self.tx_app.args.antenna = self.tx_app.args.tx_antenna + self.tx_app.args.channels = [ + int(chan.strip()) + for chan in self.tx_app.args.tx_channels.split(',') + ] + self.tx_app.usrp = None + self.log.debug("TX-Setup with args: {}".format(self.tx_app.args)) + self.tx_app.setup_usrp(uhd.usrp_sink, self.tx_app.args) + self.siggen = analog.sig_source_c(self.samp_rate, + analog.GR_COS_WAVE, + self.tx_app.args.tx_offset, 1.0) + for chan in range(len(self.tx_app.channels)): + self.connect((self.siggen, 0), (self.tx_app.usrp, chan)) + + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + + def retune_frequency(self, band_num=1, bands=1): + ref_chan = self.uhd_app.channels[0] + freq_range = literal_eval( + self.uhd_app.usrp.get_freq_range(ref_chan).__str__( + )) # returns tuple with (start_freq, end_freq, step) + + if bands > 1: + bw = (freq_range[1] - freq_range[0])/bands + freq_range = list(freq_range) + freq_range[0] = freq_range[0] + ((band_num-1) % bands)*bw + freq_range[1] = freq_range[0] + bw + + retune_freq = uniform(freq_range[0], freq_range[1]) + self.log.info('tune all channels to: {:f} MHz'.format(retune_freq / + 1e6)) + self.uhd_app.set_freq(retune_freq) + self.log.info('tune all channels to: {:f} MHz'.format( + self.uhd_app.args.freq / 1e6)) + self.uhd_app.set_freq(self.uhd_app.args.freq) diff --git a/tools/gr-usrptest/python/flowgraphs/selftest_fg.py b/tools/gr-usrptest/python/flowgraphs/selftest_fg.py new file mode 100644 index 000000000..cdfc35e74 --- /dev/null +++ b/tools/gr-usrptest/python/flowgraphs/selftest_fg.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python + +from gnuradio import blocks +from gnuradio import gr +from gnuradio import uhd +from usrptest import phase_calc_ccf +from gnuradio.uhd.uhd_app import UHDApp +import numpy as np + +class selftest_fg(gr.top_block): + + def __init__(self, freq, samp_rate, dphase, devices=list()): + gr.top_block.__init__(self, "Generate Signal extract phase") + + ################################################## + # Variables + ################################################## + self.samp_rate = samp_rate + self.freq = 10e3 + self.devices = devices + self.dphase = dphase + self.tx_gain = 50 + self.rx_gain = 50 + self.center_freq = freq + self.omega = 2*np.pi*self.freq + self.steps = np.arange( + self.samp_rate)*float(self.omega)/float(self.samp_rate) + self.reference = self.complex_sine(self.steps) + self.test_signal = self.complex_sine(self.steps+0.5*np.pi) + self.device_test = False + + ################################################## + # Block dicts + ################################################## + self.rx_devices = dict() + self.tx_devices = dict() + self.sink_dict = dict() + self.phase_dict = dict() + self.reference_source = blocks.vector_source_c(self.reference) + + if len(self.devices): + ################################################## + # Devices + ################################################## + self.device_test = True + #To reuse existing setup_usrp() command + for device in self.devices: + # Create and configure all devices + self.rx_devices[device] = uhd.usrp_source( + device, uhd.stream_args( + cpu_format="fc32", channel=range(1))) + self.tx_devices[device] = uhd.usrp_sink( + device, uhd.stream_args( + cpu_format="fc32", channel=range(1))) + self.rx_devices[device].set_samp_rate(self.samp_rate) + self.rx_devices[device].set_center_freq(self.center_freq, 0) + self.rx_devices[device].set_gain(self.rx_gain, 0) + self.tx_devices[device].set_samp_rate(self.samp_rate) + self.tx_devices[device].set_center_freq(self.center_freq, 0) + self.tx_devices[device].set_gain(self.tx_gain, 0) + self.sink_dict[device] = blocks.vector_sink_f() + self.phase_dict[device] = phase_calc_ccf( + self.samp_rate, self.freq) + for device in self.tx_devices.values(): + self.connect((self.reference_source, 0), (device, 0)) + + for device_key in self.rx_devices.keys(): + self.connect( + (self.rx_devices[device_key], 0), (self.phase_dict[device_key], 0)) + self.connect((self.reference_source, 0), + (self.phase_dict[device_key], 1)) + self.connect( + (self.phase_dict[device_key], 0), (self.sink_dict[device_key], 0)) + # Debug options + # self.sink_list.append(blocks.vector_sink_c()) + #self.connect((device, 0), (self.sink_list[-1], 0)) + # self.sink_list.append(blocks.vector_sink_c()) + #self.connect((self.reference_source, 0), (self.sink_list[-1], 0)) + else: + ################################################## + # Blocks + ################################################## + self.result = blocks.vector_sink_f(1) + self.test_source = blocks.vector_source_c(self.test_signal) + self.block_phase_calc = phase_calc_ccf( + self.samp_rate, self.freq) + + ################################################## + # Connections + ################################################## + self.connect((self.reference_source, 0), (self.block_phase_calc, 1)) + self.connect((self.test_source, 0), (self.block_phase_calc, 0)) + self.connect((self.block_phase_calc, 0), (self.result, 0)) + def complex_sine(self, steps): + return np.exp(1j*steps) + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + + def run(self): + self.start() + self.wait() + if self.device_test: + data = dict() + for device_key in self.sink_dict.keys(): + curr_data = self.sink_dict[device_key].data() + curr_data = curr_data[int(0.2*self.samp_rate):-int(0.2*self.samp_rate)] + phase_avg = np.average(curr_data) + if (np.max(curr_data) < phase_avg+self.dphase*0.5) and (np.min(curr_data) > phase_avg-self.dphase*0.5): + data[device_key] = phase_avg + else: + print("Error phase not settled") + + #Debug + # plt.ylim(-1, 1) + # plt.xlim(self.samp_rate/2.), (self.samp_rate/2.)+1000) + #for key in data: + # plt.plot(data[key]) + return data diff --git a/tools/gr-usrptest/python/functions.py b/tools/gr-usrptest/python/functions.py new file mode 100644 index 000000000..e3f7958b1 --- /dev/null +++ b/tools/gr-usrptest/python/functions.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python2 +import numpy as np +import time +import copy +import logging + + +def setup_phase_alignment_parser(parser): + test_group = parser.add_argument_group( + 'Phase alignment specific arguments') + test_group.add_argument( + '--runs', + default=10, + type=int, + help='Number of times to retune and measure d_phi') + test_group.add_argument( + '--duration', + default=5.0, + type=float, + help='Duration of a measurement run') + test_group.add_argument( + '--measurement-setup', + type=str, + help='Comma-seperated list of channel ids. Phase difference will be calculated between consecutive channels. default=(0,1,2,..,M-1) M: num_chan' + ) + test_group.add_argument( + '--log-level', + type=str, + choices=["critical", "error", "warning", "info", "debug"], + default="info") + test_group.add_argument( + '--freq-bands', + type=int, + help="Number of frequency bands in daughterboard range to randomly retune to", + default=1) + return parser + + +def setup_tx_phase_alignment_parser(parser): + tx_group = parser.add_argument_group( + 'TX Phase alignment specific arguments.') + tx_group.add_argument( + '--tx-channels', type=str, help='which channels to use') + tx_group.add_argument( + '--tx-antenna', + type=str, + help='comma-separated list of channel antennas for tx') + tx_group.add_argument( + '--tx-offset', + type=float, + help='frequency offset in Hz which should be added to center frequency for transmission' + ) + return parser + + +def setup_rts_phase_alignment_parser(parser): + rts_group = parser.add_argument_group('RTS Phase alignment specific arguments') + rts_group.add_argument( + '-pd', '--phasedev', + type=float, + default=1.0, + help='maximum phase standard deviation of dphi in a run which is considered settled (in deg)') + rts_group.add_argument( + '-dp', + '--dphi', + type=float, + default=2.0, + help='maximum allowed d_phase deviation between runs (in deg)') + return parser + +def setup_manual_phase_alignment_parser(parser): + manual_group = parser.add_argument_group( + 'Manual Phase alignment specific arguments') + manual_group.add_argument( + '--plot', + dest='plot', + action='store_true', + help='Set this argument to enable plotting results with matplotlib' + ) + manual_group.add_argument( + '--auto', + action='store_true', + help='Set this argument to enable automatic selection of test frequencies' + ) + manual_group.add_argument( + '--start-freq', + type=float, + default=0.0, + help='Start frequency for automatic selection' + ), + manual_group.add_argument( + '--stop-freq', + type=float, + default=0.0, + help='Stop frequency for automatic selection') + + parser.set_defaults(plot=False,auto=False) + return parser + + +def process_measurement_sinks(top_block): + data = list() + curr_data = dict() + for num, chan in enumerate(top_block.measurement_channels[:-1]): + curr_data['avg'] = list(top_block.measurement_sink[num].get_avg()) + curr_data['stddev'] = list(top_block.measurement_sink[num].get_stddev( + )) + curr_data['first'] = top_block.measurement_channels_names[num] + curr_data['second'] = top_block.measurement_channels_names[num + 1] + data.append(copy.copy(curr_data)) + return data + + +def run_test(top_block, ntimes): + results = dict() + num_sinks = len(top_block.measurement_sink) + for i in xrange(ntimes): + #tune frequency to random position and back to specified frequency + top_block.retune_frequency(bands=top_block.uhd_app.args.freq_bands,band_num=i+1) + time.sleep(2) + #trigger start in all measurement_sinks + for sink in top_block.measurement_sink: + sink.start_run() + #wait until every measurement_sink is ready with the current run + while (sum([ms.get_run() for ms in top_block.measurement_sink]) < ( + (i + 1) * num_sinks)): + time.sleep(1) + results = process_measurement_sinks(top_block) + return results + + +def log_level(string): + return getattr(logging, string.upper()) diff --git a/tools/gr-usrptest/python/phase_calc_ccf.py b/tools/gr-usrptest/python/phase_calc_ccf.py new file mode 100644 index 000000000..fe3cf55a8 --- /dev/null +++ b/tools/gr-usrptest/python/phase_calc_ccf.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2016 Ettus Research LLC. +# +# This is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr +from gnuradio import blocks +import numpy as np + + +class phase_calc_ccf(gr.hier_block2): + """ + docstring for block phase_calc_ccf + """ + + def __init__(self): + gr.hier_block2.__init__( + self, + "phase_calc_ccf", + gr.io_signature(2, 2, gr.sizeof_gr_complex), # Input signature + gr.io_signature(1, 1, gr.sizeof_float)) # Output signature + self.block = dict() + self.block['mult_conj'] = blocks.multiply_conjugate_cc() + self.block['arg'] = blocks.complex_to_arg() + self.block['mult_const'] = blocks.multiply_const_ff(180.0 / np.pi) + + self.connect((self, 0), (self.block['mult_conj'], 0)) + self.connect((self, 1), (self.block['mult_conj'], 1)) + self.connect((self.block['mult_conj'], 0), (self.block['arg'], 0)) + self.connect((self.block['arg'], 0), (self.block['mult_const'], 0)) + self.connect((self.block['mult_const'], 0), (self, 0)) diff --git a/tools/gr-usrptest/python/qa_measurement_sink_f.py b/tools/gr-usrptest/python/qa_measurement_sink_f.py new file mode 100755 index 000000000..ceb9913ae --- /dev/null +++ b/tools/gr-usrptest/python/qa_measurement_sink_f.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2016 Ettus Research LLC. +# +# This is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +from gnuradio import blocks +import usrptest_swig as usrptest + +class qa_measurement_sink_f (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + # set up fg + self.tb.run () + # check data + + +if __name__ == '__main__': + gr_unittest.run(qa_measurement_sink_f, "qa_measurement_sink_f.xml") diff --git a/tools/gr-usrptest/python/rts_tests/CMakeLists.txt b/tools/gr-usrptest/python/rts_tests/CMakeLists.txt new file mode 100644 index 000000000..fbe49995a --- /dev/null +++ b/tools/gr-usrptest/python/rts_tests/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright 2011 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. + +######################################################################## +# Install python sources +######################################################################## +GR_PYTHON_INSTALL( + FILES + __init__.py + test_phasealignment.py DESTINATION ${GR_PYTHON_DIR}/usrptest/rts_tests +) diff --git a/tools/gr-usrptest/python/rts_tests/__init__.py b/tools/gr-usrptest/python/rts_tests/__init__.py new file mode 100644 index 000000000..048e5638a --- /dev/null +++ b/tools/gr-usrptest/python/rts_tests/__init__.py @@ -0,0 +1,14 @@ +""" +usrptest.flowgraphs +====================================== + +Contents +-------- + +Subpackages +----------- +:: + +The existance of the file turns the folder into a Python module. + +""" diff --git a/tools/gr-usrptest/python/rts_tests/test_phasealignment.py b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py new file mode 100644 index 000000000..10642f298 --- /dev/null +++ b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2016 Ettus Research LLC. +# +# This is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# +import unittest +from tinydb import TinyDB, Query +from usrptest.flowgraphs import phasealignment_fg +from usrptest.functions import setup_phase_alignment_parser, run_test +from gnuradio.uhd.uhd_app import UHDApp +import numpy as np +import argparse +import time + +class gr_usrp_test(unittest.TestCase): + def __init__(self, methodName='runTest', args=None): + super(gr_usrp_test,self).__init__(methodName) + self.args = args + +class qa_phasealignment(gr_usrp_test): + def setUp(self): + self.uhd_app = UHDApp(args=self.args) + self.tb = phasealignment_fg.phasealignment_fg(self.uhd_app) + self.db = TinyDB('phase_db.json') + + def tearDown(self): + self.uhd_app = None + self.tb = None + + def test_001(self): + self.tb.start() + time.sleep(2) + results = run_test(self.tb,self.args.runs) # dict key:dev, value: dict{dphase:[],stddev:[]} + time.sleep(1) + self.first_device = self.tb.measurement_channels_names[:-1] + self.second_device = self.tb.measurement_channels_names[1:] + #self.tb.stop() + #self.tb.wait() + self.time_stamp = time.strftime('%Y%m%d%H%M') + self.passed = True + for fdev, sdev in zip(self.first_device,self.second_device): + print('Comparing values for phase difference between {} and {}'.format(fdev, sdev)) + dphase_list = results[fdev]['avg'] + dev_list = results[fdev]['stddev'] + dphase = np.average(dphase_list) + dev = np.average(dev_list) + ref_meas = get_reference_meas(self.db, fdev, sdev, self.args.freq) + for dphase_i in dphase_list: + passed = True + if abs(dphase_i - dphase) > self.args.dphi and passed: + print('\t dPhase of a measurement_run differs from average dhpase. dphase_run: {}, dphase_avg: {}'.format(dphase_i, dphase)) + passed = False + if dev > self.args.phasedev: + print('\t dPhase deviates during measurement. stddev: {}'.format(dev)) + passed = False + if ref_meas: + if abs(ref_meas['dphase'] - dphase) > self.args.dphi: + print('\t dPhase differs from reference measurement. Now: {}, reference: {}'.format(dphase, ref_meas['dphase'])) + if not passed: + self.passed = False + else: + self.db.insert({'dev1':fdev, 'dev2':sdev, 'timestamp':self.time_stamp, 'dphase':dphase, 'dphase_dev': dev, 'freq': self.args.freq}) + self.tb.stop() + self.assertTrue(self.passed) + +def get_previous_meas(db, dev1, dev2, freq): + meas = Query() + results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq)) + prev_result = dict() + if results: + prev_result = results[0] + for result in results: + if result['timestamp'] > prev_result['timestamp']: + prev_result = result + return prev_result + +def get_reference_meas(db, dev1, dev2, freq): + meas = Query() + results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq)) + ref_result = dict() + if results: + ref_result = results[0] + for result in results: + if result['timestamp'] < ref_result['timestamp']: + ref_result = result + return ref_result + + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(conflict_handler='resolve') + parser = setup_phase_alignment_parser(parser) + UHDApp.setup_argparser(parser=parser) + args = parser.parse_args() + def make_suite(testcase_class): + testloader = unittest.TestLoader() + testnames = testloader.getTestCaseNames(testcase_class) + suite = unittest.TestSuite() + for name in testnames: + suite.addTest(testcase_class(name, args=args)) + return suite + + # Add tests. + alltests = unittest.TestSuite() + alltests.addTest(make_suite(qa_phasealignment)) + result = unittest.TextTestRunner(verbosity=2).run(alltests) # Run tests. |