aboutsummaryrefslogtreecommitdiffstats
path: root/tools/gr-usrptest/python/functions.py
blob: e3f7958b1433452bb70a19fce237396c96eceb29 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python2
import numpy as np
import time
import copy
import logging


def setup_phase_alignment_parser(parser):
    test_group = parser.add_argument_group(
        'Phase alignment specific arguments')
    test_group.add_argument(
        '--runs',
        default=10,
        type=int,
        help='Number of times to retune and measure d_phi')
    test_group.add_argument(
        '--duration',
        default=5.0,
        type=float,
        help='Duration of a measurement run')
    test_group.add_argument(
        '--measurement-setup',
        type=str,
        help='Comma-seperated list of channel ids. Phase difference will be calculated between consecutive channels. default=(0,1,2,..,M-1) M: num_chan'
    )
    test_group.add_argument(
        '--log-level',
        type=str,
        choices=["critical", "error", "warning", "info", "debug"],
        default="info")
    test_group.add_argument(
        '--freq-bands',
        type=int,
        help="Number of frequency bands in daughterboard range to randomly retune to",
        default=1)
    return parser


def setup_tx_phase_alignment_parser(parser):
    tx_group = parser.add_argument_group(
        'TX Phase alignment specific arguments.')
    tx_group.add_argument(
        '--tx-channels', type=str, help='which channels to use')
    tx_group.add_argument(
        '--tx-antenna',
        type=str,
        help='comma-separated list of channel antennas for tx')
    tx_group.add_argument(
        '--tx-offset',
        type=float,
        help='frequency offset in Hz which should be added to center frequency for transmission'
    )
    return parser


def setup_rts_phase_alignment_parser(parser):
    rts_group = parser.add_argument_group('RTS Phase alignment specific arguments')
    rts_group.add_argument(
        '-pd', '--phasedev',
        type=float,
        default=1.0,
        help='maximum phase standard deviation of dphi in a run which is considered settled (in deg)')
    rts_group.add_argument(
        '-dp',
        '--dphi',
        type=float,
        default=2.0,
        help='maximum allowed d_phase deviation between runs (in deg)')
    return parser

def setup_manual_phase_alignment_parser(parser):
    manual_group = parser.add_argument_group(
        'Manual Phase alignment specific arguments')
    manual_group.add_argument(
        '--plot',
        dest='plot',
        action='store_true',
        help='Set this argument to enable plotting results with matplotlib'
    )
    manual_group.add_argument(
        '--auto',
        action='store_true',
        help='Set this argument to enable automatic selection of test frequencies'
    )
    manual_group.add_argument(
        '--start-freq',
        type=float,
        default=0.0,
        help='Start frequency for automatic selection'
    ),
    manual_group.add_argument(
        '--stop-freq',
        type=float,
        default=0.0,
        help='Stop frequency for automatic selection')

    parser.set_defaults(plot=False,auto=False)
    return parser


def process_measurement_sinks(top_block):
    data = list()
    curr_data = dict()
    for num, chan in enumerate(top_block.measurement_channels[:-1]):
        curr_data['avg'] = list(top_block.measurement_sink[num].get_avg())
        curr_data['stddev'] = list(top_block.measurement_sink[num].get_stddev(
        ))
        curr_data['first'] = top_block.measurement_channels_names[num]
        curr_data['second'] = top_block.measurement_channels_names[num + 1]
        data.append(copy.copy(curr_data))
    return data


def run_test(top_block, ntimes):
    results = dict()
    num_sinks = len(top_block.measurement_sink)
    for i in xrange(ntimes):
        #tune frequency to random position and back to specified frequency
        top_block.retune_frequency(bands=top_block.uhd_app.args.freq_bands,band_num=i+1)
        time.sleep(2)
        #trigger start in all measurement_sinks
        for sink in top_block.measurement_sink:
            sink.start_run()
        #wait until every measurement_sink is ready with the current run
        while (sum([ms.get_run() for ms in top_block.measurement_sink]) < (
            (i + 1) * num_sinks)):
            time.sleep(1)
    results = process_measurement_sinks(top_block)
    return results


def log_level(string):
    return getattr(logging, string.upper())