diff options
Diffstat (limited to 'tools/gr-usrptest/python/rts_tests/test_phasealignment.py')
-rw-r--r-- | tools/gr-usrptest/python/rts_tests/test_phasealignment.py | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/tools/gr-usrptest/python/rts_tests/test_phasealignment.py b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py new file mode 100644 index 000000000..10642f298 --- /dev/null +++ b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2016 Ettus Research LLC. +# +# This is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# +import unittest +from tinydb import TinyDB, Query +from usrptest.flowgraphs import phasealignment_fg +from usrptest.functions import setup_phase_alignment_parser, run_test +from gnuradio.uhd.uhd_app import UHDApp +import numpy as np +import argparse +import time + +class gr_usrp_test(unittest.TestCase): + def __init__(self, methodName='runTest', args=None): + super(gr_usrp_test,self).__init__(methodName) + self.args = args + +class qa_phasealignment(gr_usrp_test): + def setUp(self): + self.uhd_app = UHDApp(args=self.args) + self.tb = phasealignment_fg.phasealignment_fg(self.uhd_app) + self.db = TinyDB('phase_db.json') + + def tearDown(self): + self.uhd_app = None + self.tb = None + + def test_001(self): + self.tb.start() + time.sleep(2) + results = run_test(self.tb,self.args.runs) # dict key:dev, value: dict{dphase:[],stddev:[]} + time.sleep(1) + self.first_device = self.tb.measurement_channels_names[:-1] + self.second_device = self.tb.measurement_channels_names[1:] + #self.tb.stop() + #self.tb.wait() + self.time_stamp = time.strftime('%Y%m%d%H%M') + self.passed = True + for fdev, sdev in zip(self.first_device,self.second_device): + print('Comparing values for phase difference between {} and {}'.format(fdev, sdev)) + dphase_list = results[fdev]['avg'] + dev_list = results[fdev]['stddev'] + dphase = np.average(dphase_list) + dev = np.average(dev_list) + ref_meas = get_reference_meas(self.db, fdev, sdev, self.args.freq) + for dphase_i in dphase_list: + passed = True + if abs(dphase_i - dphase) > self.args.dphi and passed: + print('\t dPhase of a measurement_run differs from average dhpase. dphase_run: {}, dphase_avg: {}'.format(dphase_i, dphase)) + passed = False + if dev > self.args.phasedev: + print('\t dPhase deviates during measurement. stddev: {}'.format(dev)) + passed = False + if ref_meas: + if abs(ref_meas['dphase'] - dphase) > self.args.dphi: + print('\t dPhase differs from reference measurement. Now: {}, reference: {}'.format(dphase, ref_meas['dphase'])) + if not passed: + self.passed = False + else: + self.db.insert({'dev1':fdev, 'dev2':sdev, 'timestamp':self.time_stamp, 'dphase':dphase, 'dphase_dev': dev, 'freq': self.args.freq}) + self.tb.stop() + self.assertTrue(self.passed) + +def get_previous_meas(db, dev1, dev2, freq): + meas = Query() + results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq)) + prev_result = dict() + if results: + prev_result = results[0] + for result in results: + if result['timestamp'] > prev_result['timestamp']: + prev_result = result + return prev_result + +def get_reference_meas(db, dev1, dev2, freq): + meas = Query() + results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq)) + ref_result = dict() + if results: + ref_result = results[0] + for result in results: + if result['timestamp'] < ref_result['timestamp']: + ref_result = result + return ref_result + + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(conflict_handler='resolve') + parser = setup_phase_alignment_parser(parser) + UHDApp.setup_argparser(parser=parser) + args = parser.parse_args() + def make_suite(testcase_class): + testloader = unittest.TestLoader() + testnames = testloader.getTestCaseNames(testcase_class) + suite = unittest.TestSuite() + for name in testnames: + suite.addTest(testcase_class(name, args=args)) + return suite + + # Add tests. + alltests = unittest.TestSuite() + alltests.addTest(make_suite(qa_phasealignment)) + result = unittest.TextTestRunner(verbosity=2).run(alltests) # Run tests. |