diff options
author | Andrej Rode <andrej.rode@ettus.com> | 2016-10-19 16:41:59 -0700 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2017-05-26 16:01:37 -0700 |
commit | 4b7b35570c13766ed575f69d79bf1ded954b6886 (patch) | |
tree | a07e385db0e3e6adca71a2b3c817993e0b2c8422 /tools/gr-usrptest/python/rts_tests | |
parent | 76e9e6393fe1d5a0ccc9e05deb431694921850ec (diff) | |
download | uhd-4b7b35570c13766ed575f69d79bf1ded954b6886.tar.gz uhd-4b7b35570c13766ed575f69d79bf1ded954b6886.tar.bz2 uhd-4b7b35570c13766ed575f69d79bf1ded954b6886.zip |
gr-usrptest: add LabVIEW remote control capability
- require modules labview_automation and hoplite for RTS python module
- new python module: labview_control
Diffstat (limited to 'tools/gr-usrptest/python/rts_tests')
-rw-r--r-- | tools/gr-usrptest/python/rts_tests/CMakeLists.txt | 6 | ||||
-rwxr-xr-x[-rw-r--r--] | tools/gr-usrptest/python/rts_tests/test_phasealignment.py | 93 |
2 files changed, 79 insertions, 20 deletions
diff --git a/tools/gr-usrptest/python/rts_tests/CMakeLists.txt b/tools/gr-usrptest/python/rts_tests/CMakeLists.txt index fbe49995a..03111a9bb 100644 --- a/tools/gr-usrptest/python/rts_tests/CMakeLists.txt +++ b/tools/gr-usrptest/python/rts_tests/CMakeLists.txt @@ -25,3 +25,9 @@ GR_PYTHON_INSTALL( __init__.py test_phasealignment.py DESTINATION ${GR_PYTHON_DIR}/usrptest/rts_tests ) + +GR_PYTHON_INSTALL( + PROGRAMS + test_phasealignment.py + DESTINATION bin +) diff --git a/tools/gr-usrptest/python/rts_tests/test_phasealignment.py b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py index 10642f298..12856bbdb 100644..100755 --- a/tools/gr-usrptest/python/rts_tests/test_phasealignment.py +++ b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py @@ -21,66 +21,106 @@ import unittest from tinydb import TinyDB, Query from usrptest.flowgraphs import phasealignment_fg -from usrptest.functions import setup_phase_alignment_parser, run_test +from usrptest.functions import setup_phase_alignment_parser, setup_tx_phase_alignment_parser, setup_rts_phase_alignment_parser, run_test, log_level +from usrptest.labview_control import lv_control from gnuradio.uhd.uhd_app import UHDApp +import logging +import sys import numpy as np import argparse import time +import copy + class gr_usrp_test(unittest.TestCase): def __init__(self, methodName='runTest', args=None): - super(gr_usrp_test,self).__init__(methodName) + super(gr_usrp_test, self).__init__(methodName) self.args = args + class qa_phasealignment(gr_usrp_test): def setUp(self): + time.sleep(15) #Wait for devices to settle, just in case + if self.args.lv_host is not None: + self.siggen = lv_control.vst_siggen(self.args.lv_host, + self.args.lv_basepath, + self.args.lv_vst_name) + self.switch = lv_control.executive_switch(self.args.lv_host, + self.args.lv_basepath, + self.args.lv_switch_name) + self.siggen.set_freq(self.args.freq + self.args.tx_offset) + self.switch.connect_ports( + *self.args.lv_switch_ports.strip().split(',')) + self.uhd_app = UHDApp(args=self.args) + self.log = logging.getLogger("test_phasealignment") self.tb = phasealignment_fg.phasealignment_fg(self.uhd_app) self.db = TinyDB('phase_db.json') def tearDown(self): self.uhd_app = None self.tb = None + if args.lv_host is not None: + self.siggen.disconnect() + self.switch.disconnect_all() def test_001(self): self.tb.start() time.sleep(2) - results = run_test(self.tb,self.args.runs) # dict key:dev, value: dict{dphase:[],stddev:[]} + results = run_test( + self.tb, + self.args.runs) # dict key:dev, value: dict{dphase:[],stddev:[]} time.sleep(1) - self.first_device = self.tb.measurement_channels_names[:-1] - self.second_device = self.tb.measurement_channels_names[1:] #self.tb.stop() #self.tb.wait() self.time_stamp = time.strftime('%Y%m%d%H%M') self.passed = True - for fdev, sdev in zip(self.first_device,self.second_device): - print('Comparing values for phase difference between {} and {}'.format(fdev, sdev)) - dphase_list = results[fdev]['avg'] - dev_list = results[fdev]['stddev'] + for result in results: + fdev = result['first'] + sdev = result['second'] + self.log.info('Comparing values for phase difference between {} and {}'. + format(fdev, sdev)) + dphase_list = result['avg'] + dev_list = result['stddev'] dphase = np.average(dphase_list) dev = np.average(dev_list) ref_meas = get_reference_meas(self.db, fdev, sdev, self.args.freq) for dphase_i in dphase_list: passed = True if abs(dphase_i - dphase) > self.args.dphi and passed: - print('\t dPhase of a measurement_run differs from average dhpase. dphase_run: {}, dphase_avg: {}'.format(dphase_i, dphase)) + self.log.info( + '\t dPhase of a measurement_run differs from average dhpase. dphase_run: {}, dphase_avg: {}'. + format(dphase_i, dphase)) passed = False if dev > self.args.phasedev: - print('\t dPhase deviates during measurement. stddev: {}'.format(dev)) + self.log.info('\t dPhase deviates during measurement. stddev: {}'. + format(dev)) passed = False if ref_meas: if abs(ref_meas['dphase'] - dphase) > self.args.dphi: - print('\t dPhase differs from reference measurement. Now: {}, reference: {}'.format(dphase, ref_meas['dphase'])) + self.log.info( + '\t dPhase differs from reference measurement. Now: {}, reference: {}'. + format(dphase, ref_meas['dphase'])) if not passed: self.passed = False else: - self.db.insert({'dev1':fdev, 'dev2':sdev, 'timestamp':self.time_stamp, 'dphase':dphase, 'dphase_dev': dev, 'freq': self.args.freq}) + self.db.insert({ + 'dev1': fdev, + 'dev2': sdev, + 'timestamp': self.time_stamp, + 'dphase': dphase, + 'dphase_dev': dev, + 'freq': self.args.freq + }) self.tb.stop() + self.tb.wait() self.assertTrue(self.passed) + def get_previous_meas(db, dev1, dev2, freq): meas = Query() - results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq)) + results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq + == freq)) prev_result = dict() if results: prev_result = results[0] @@ -89,9 +129,11 @@ def get_previous_meas(db, dev1, dev2, freq): prev_result = result return prev_result + def get_reference_meas(db, dev1, dev2, freq): meas = Query() - results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq)) + results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq + == freq)) ref_result = dict() if results: ref_result = results[0] @@ -101,21 +143,32 @@ def get_reference_meas(db, dev1, dev2, freq): return ref_result - if __name__ == '__main__': + # parse all arguments parser = argparse.ArgumentParser(conflict_handler='resolve') parser = setup_phase_alignment_parser(parser) + parser = setup_tx_phase_alignment_parser(parser) + parser = setup_rts_phase_alignment_parser(parser) + logging.basicConfig(stream=sys.stdout) UHDApp.setup_argparser(parser=parser) args = parser.parse_args() - def make_suite(testcase_class): + logging.getLogger("test_phasealignment").setLevel(log_level(args.log_level)) + + freqlist = args.freqlist.strip().split(',') + + def make_suite(testcase_class, freqlist): testloader = unittest.TestLoader() testnames = testloader.getTestCaseNames(testcase_class) suite = unittest.TestSuite() for name in testnames: - suite.addTest(testcase_class(name, args=args)) + for freq in freqlist: + test_args = copy.deepcopy(args) + test_args.freq = float(freq) + suite.addTest(testcase_class(name, args=test_args)) return suite # Add tests. alltests = unittest.TestSuite() - alltests.addTest(make_suite(qa_phasealignment)) - result = unittest.TextTestRunner(verbosity=2).run(alltests) # Run tests. + alltests.addTest(make_suite(qa_phasealignment, freqlist)) + result = unittest.TextTestRunner(verbosity=2).run(alltests) # Run tests. + sys.exit(not result.wasSuccessful()) |