aboutsummaryrefslogtreecommitdiffstats
path: root/tools/gr-usrptest/python/rts_tests/test_phasealignment.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/gr-usrptest/python/rts_tests/test_phasealignment.py')
-rw-r--r--tools/gr-usrptest/python/rts_tests/test_phasealignment.py121
1 files changed, 121 insertions, 0 deletions
diff --git a/tools/gr-usrptest/python/rts_tests/test_phasealignment.py b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py
new file mode 100644
index 000000000..10642f298
--- /dev/null
+++ b/tools/gr-usrptest/python/rts_tests/test_phasealignment.py
@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright 2016 Ettus Research LLC.
+#
+# This is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+import unittest
+from tinydb import TinyDB, Query
+from usrptest.flowgraphs import phasealignment_fg
+from usrptest.functions import setup_phase_alignment_parser, run_test
+from gnuradio.uhd.uhd_app import UHDApp
+import numpy as np
+import argparse
+import time
+
+class gr_usrp_test(unittest.TestCase):
+ def __init__(self, methodName='runTest', args=None):
+ super(gr_usrp_test,self).__init__(methodName)
+ self.args = args
+
+class qa_phasealignment(gr_usrp_test):
+ def setUp(self):
+ self.uhd_app = UHDApp(args=self.args)
+ self.tb = phasealignment_fg.phasealignment_fg(self.uhd_app)
+ self.db = TinyDB('phase_db.json')
+
+ def tearDown(self):
+ self.uhd_app = None
+ self.tb = None
+
+ def test_001(self):
+ self.tb.start()
+ time.sleep(2)
+ results = run_test(self.tb,self.args.runs) # dict key:dev, value: dict{dphase:[],stddev:[]}
+ time.sleep(1)
+ self.first_device = self.tb.measurement_channels_names[:-1]
+ self.second_device = self.tb.measurement_channels_names[1:]
+ #self.tb.stop()
+ #self.tb.wait()
+ self.time_stamp = time.strftime('%Y%m%d%H%M')
+ self.passed = True
+ for fdev, sdev in zip(self.first_device,self.second_device):
+ print('Comparing values for phase difference between {} and {}'.format(fdev, sdev))
+ dphase_list = results[fdev]['avg']
+ dev_list = results[fdev]['stddev']
+ dphase = np.average(dphase_list)
+ dev = np.average(dev_list)
+ ref_meas = get_reference_meas(self.db, fdev, sdev, self.args.freq)
+ for dphase_i in dphase_list:
+ passed = True
+ if abs(dphase_i - dphase) > self.args.dphi and passed:
+ print('\t dPhase of a measurement_run differs from average dhpase. dphase_run: {}, dphase_avg: {}'.format(dphase_i, dphase))
+ passed = False
+ if dev > self.args.phasedev:
+ print('\t dPhase deviates during measurement. stddev: {}'.format(dev))
+ passed = False
+ if ref_meas:
+ if abs(ref_meas['dphase'] - dphase) > self.args.dphi:
+ print('\t dPhase differs from reference measurement. Now: {}, reference: {}'.format(dphase, ref_meas['dphase']))
+ if not passed:
+ self.passed = False
+ else:
+ self.db.insert({'dev1':fdev, 'dev2':sdev, 'timestamp':self.time_stamp, 'dphase':dphase, 'dphase_dev': dev, 'freq': self.args.freq})
+ self.tb.stop()
+ self.assertTrue(self.passed)
+
+def get_previous_meas(db, dev1, dev2, freq):
+ meas = Query()
+ results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq))
+ prev_result = dict()
+ if results:
+ prev_result = results[0]
+ for result in results:
+ if result['timestamp'] > prev_result['timestamp']:
+ prev_result = result
+ return prev_result
+
+def get_reference_meas(db, dev1, dev2, freq):
+ meas = Query()
+ results = db.search((meas.dev1 == dev1) & (meas.dev2 == dev2) & (meas.freq == freq))
+ ref_result = dict()
+ if results:
+ ref_result = results[0]
+ for result in results:
+ if result['timestamp'] < ref_result['timestamp']:
+ ref_result = result
+ return ref_result
+
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(conflict_handler='resolve')
+ parser = setup_phase_alignment_parser(parser)
+ UHDApp.setup_argparser(parser=parser)
+ args = parser.parse_args()
+ def make_suite(testcase_class):
+ testloader = unittest.TestLoader()
+ testnames = testloader.getTestCaseNames(testcase_class)
+ suite = unittest.TestSuite()
+ for name in testnames:
+ suite.addTest(testcase_class(name, args=args))
+ return suite
+
+ # Add tests.
+ alltests = unittest.TestSuite()
+ alltests.addTest(make_suite(qa_phasealignment))
+ result = unittest.TextTestRunner(verbosity=2).run(alltests) # Run tests.