diff options
| author | Andrej Rode <andrej.rode@ettus.com> | 2016-10-19 16:41:59 -0700 | 
|---|---|---|
| committer | Martin Braun <martin.braun@ettus.com> | 2017-05-26 16:01:37 -0700 | 
| commit | 4b7b35570c13766ed575f69d79bf1ded954b6886 (patch) | |
| tree | a07e385db0e3e6adca71a2b3c817993e0b2c8422 /tools/gr-usrptest/python/rts_tests | |
| parent | 76e9e6393fe1d5a0ccc9e05deb431694921850ec (diff) | |
| download | uhd-4b7b35570c13766ed575f69d79bf1ded954b6886.tar.gz uhd-4b7b35570c13766ed575f69d79bf1ded954b6886.tar.bz2 uhd-4b7b35570c13766ed575f69d79bf1ded954b6886.zip | |
gr-usrptest: add LabVIEW remote control capability
 - require modules labview_automation and hoplite for RTS python module
 - new python module: labview_control
Diffstat (limited to 'tools/gr-usrptest/python/rts_tests')
| -rw-r--r-- | tools/gr-usrptest/python/rts_tests/CMakeLists.txt | 6 | ||||
| -rwxr-xr-x[-rw-r--r--] | tools/gr-usrptest/python/rts_tests/test_phasealignment.py | 93 | 
2 files changed, 79 insertions, 20 deletions
| diff --git a/tools/gr-usrptest/python/rts_tests/CMakeLists.txt b/tools/gr-usrptest/python/rts_tests/CMakeLists.txt index fbe49995a..03111a9bb 100644 --- a/tools/gr-usrptest/python/rts_tests/CMakeLists.txt +++ b/tools/gr-usrptest/python/rts_tests/CMakeLists.txt @@ -25,3 +25,9 @@ GR_PYTHON_INSTALL(      __init__.py      test_phasealignment.py DESTINATION ${GR_PYTHON_DIR}/usrptest/rts_tests  ) + +GR_PYTHON_INSTALL( +    PROGRAMS +    test_phasealignment.py +    DESTINATION bin +) diff --git a/tools/gr-usrptest/python/rts_tests/test_phasealignment.py b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py index 10642f298..12856bbdb 100644..100755 --- a/tools/gr-usrptest/python/rts_tests/test_phasealignment.py +++ b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py @@ -21,66 +21,106 @@  import unittest  from tinydb import TinyDB, Query  from usrptest.flowgraphs import phasealignment_fg -from usrptest.functions import setup_phase_alignment_parser, run_test +from usrptest.functions import setup_phase_alignment_parser, setup_tx_phase_alignment_parser, setup_rts_phase_alignment_parser, run_test, log_level +from usrptest.labview_control import lv_control  from gnuradio.uhd.uhd_app import UHDApp +import logging +import sys  import numpy as np  import argparse  import time +import copy +  class gr_usrp_test(unittest.TestCase):      def __init__(self, methodName='runTest', args=None): -        super(gr_usrp_test,self).__init__(methodName) +        super(gr_usrp_test, self).__init__(methodName)          self.args = args +  class qa_phasealignment(gr_usrp_test):      def setUp(self): +        time.sleep(15) #Wait for devices to settle, just in case +        if self.args.lv_host is not None: +            self.siggen = lv_control.vst_siggen(self.args.lv_host, +                                                self.args.lv_basepath, +                                                self.args.lv_vst_name) +            self.switch = lv_control.executive_switch(self.args.lv_host, +                                                      self.args.lv_basepath, +                                                      self.args.lv_switch_name) +            self.siggen.set_freq(self.args.freq + self.args.tx_offset) +            self.switch.connect_ports( +                *self.args.lv_switch_ports.strip().split(',')) +          self.uhd_app = UHDApp(args=self.args) +        self.log = logging.getLogger("test_phasealignment")          self.tb = phasealignment_fg.phasealignment_fg(self.uhd_app)          self.db = TinyDB('phase_db.json')      def tearDown(self):          self.uhd_app = None          self.tb = None +        if args.lv_host is not None: +            self.siggen.disconnect() +            self.switch.disconnect_all()      def test_001(self):          self.tb.start()          time.sleep(2) -        results = run_test(self.tb,self.args.runs) # dict key:dev, value: dict{dphase:[],stddev:[]} +        results = run_test( +            self.tb, +            self.args.runs)  # dict key:dev, value: dict{dphase:[],stddev:[]}          time.sleep(1) -        self.first_device = self.tb.measurement_channels_names[:-1] -        self.second_device = self.tb.measurement_channels_names[1:]          #self.tb.stop()          #self.tb.wait()          self.time_stamp = time.strftime('%Y%m%d%H%M')          self.passed = True -        for fdev, sdev in zip(self.first_device,self.second_device): -            print('Comparing values for phase difference between {} and {}'.format(fdev, sdev)) -            dphase_list = results[fdev]['avg'] -            dev_list = results[fdev]['stddev'] +        for result in results: +            fdev = result['first'] +            sdev = result['second'] +            self.log.info('Comparing values for phase difference between {} and {}'. +                  format(fdev, sdev)) +            dphase_list = result['avg'] +            dev_list = result['stddev']              dphase = np.average(dphase_list)              dev = np.average(dev_list)              ref_meas = get_reference_meas(self.db, fdev, sdev, self.args.freq)              for dphase_i in dphase_list:                  passed = True                  if abs(dphase_i - dphase) > self.args.dphi and passed: -                    print('\t dPhase of a measurement_run differs from average dhpase. dphase_run: {}, dphase_avg: {}'.format(dphase_i, dphase)) +                    self.log.info( +                        '\t dPhase of a measurement_run differs from average dhpase. dphase_run: {}, dphase_avg: {}'. +                        format(dphase_i, dphase))                      passed = False              if dev > self.args.phasedev: -                print('\t dPhase deviates during measurement. stddev: {}'.format(dev)) +                self.log.info('\t dPhase deviates during measurement. stddev: {}'. +                      format(dev))                  passed = False              if ref_meas:                  if abs(ref_meas['dphase'] - dphase) > self.args.dphi: -                    print('\t dPhase differs from reference measurement. Now: {}, reference: {}'.format(dphase, ref_meas['dphase'])) +                    self.log.info( +                        '\t dPhase differs from reference measurement. Now: {}, reference: {}'. +                        format(dphase, ref_meas['dphase']))              if not passed:                  self.passed = False              else: -                self.db.insert({'dev1':fdev, 'dev2':sdev, 'timestamp':self.time_stamp, 'dphase':dphase, 'dphase_dev': dev, 'freq': self.args.freq}) +                self.db.insert({ +                    'dev1': fdev, +                    'dev2': sdev, +                    'timestamp': self.time_stamp, +                    'dphase': dphase, +                    'dphase_dev': dev, +                    'freq': self.args.freq +                })          self.tb.stop() +        self.tb.wait()          self.assertTrue(self.passed) +  def get_previous_meas(db, dev1, dev2, freq):      meas = Query() -    results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq)) +    results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq +                                                                     == freq))      prev_result = dict()      if results:          prev_result = results[0] @@ -89,9 +129,11 @@ def get_previous_meas(db, dev1, dev2, freq):                  prev_result = result      return prev_result +  def get_reference_meas(db, dev1, dev2, freq):      meas = Query() -    results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq)) +    results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq +                                                                     == freq))      ref_result = dict()      if results:          ref_result = results[0] @@ -101,21 +143,32 @@ def get_reference_meas(db, dev1, dev2, freq):      return ref_result -  if __name__ == '__main__': +    # parse all arguments      parser = argparse.ArgumentParser(conflict_handler='resolve')      parser = setup_phase_alignment_parser(parser) +    parser = setup_tx_phase_alignment_parser(parser) +    parser = setup_rts_phase_alignment_parser(parser) +    logging.basicConfig(stream=sys.stdout)      UHDApp.setup_argparser(parser=parser)      args = parser.parse_args() -    def make_suite(testcase_class): +    logging.getLogger("test_phasealignment").setLevel(log_level(args.log_level)) + +    freqlist = args.freqlist.strip().split(',') + +    def make_suite(testcase_class, freqlist):          testloader = unittest.TestLoader()          testnames = testloader.getTestCaseNames(testcase_class)          suite = unittest.TestSuite()          for name in testnames: -            suite.addTest(testcase_class(name, args=args)) +            for freq in freqlist: +                test_args = copy.deepcopy(args) +                test_args.freq = float(freq) +                suite.addTest(testcase_class(name, args=test_args))          return suite      # Add tests.      alltests = unittest.TestSuite() -    alltests.addTest(make_suite(qa_phasealignment)) -    result = unittest.TextTestRunner(verbosity=2).run(alltests) # Run tests. +    alltests.addTest(make_suite(qa_phasealignment, freqlist)) +    result = unittest.TextTestRunner(verbosity=2).run(alltests)  # Run tests. +    sys.exit(not result.wasSuccessful()) | 
