aboutsummaryrefslogtreecommitdiffstats
path: root/tools/gr-usrptest/python/flowgraphs/selftest_fg.py
blob: cdfc35e741309ccbaf9496b8702b57b5e70100a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python

from gnuradio import blocks
from gnuradio import gr
from gnuradio import uhd
from usrptest import phase_calc_ccf
from gnuradio.uhd.uhd_app import UHDApp
import numpy as np

class selftest_fg(gr.top_block):

    def __init__(self, freq, samp_rate, dphase, devices=list()):
        gr.top_block.__init__(self, "Generate Signal extract phase")

        ##################################################
        # Variables
        ##################################################
        self.samp_rate = samp_rate
        self.freq = 10e3
        self.devices = devices
        self.dphase = dphase
        self.tx_gain = 50
        self.rx_gain = 50
        self.center_freq = freq
        self.omega = 2*np.pi*self.freq
        self.steps = np.arange(
            self.samp_rate)*float(self.omega)/float(self.samp_rate)
        self.reference = self.complex_sine(self.steps)
        self.test_signal = self.complex_sine(self.steps+0.5*np.pi)
        self.device_test = False

        ##################################################
        # Block dicts
        ##################################################
        self.rx_devices = dict()
        self.tx_devices = dict()
        self.sink_dict = dict()
        self.phase_dict = dict()
        self.reference_source = blocks.vector_source_c(self.reference)

        if len(self.devices):
            ##################################################
            # Devices
            ##################################################
            self.device_test = True
            #To reuse existing setup_usrp() command
            for device in self.devices:
                # Create and configure all devices
                self.rx_devices[device] = uhd.usrp_source(
                    device, uhd.stream_args(
                        cpu_format="fc32", channel=range(1)))
                self.tx_devices[device] = uhd.usrp_sink(
                    device, uhd.stream_args(
                        cpu_format="fc32", channel=range(1)))
                self.rx_devices[device].set_samp_rate(self.samp_rate)
                self.rx_devices[device].set_center_freq(self.center_freq, 0)
                self.rx_devices[device].set_gain(self.rx_gain, 0)
                self.tx_devices[device].set_samp_rate(self.samp_rate)
                self.tx_devices[device].set_center_freq(self.center_freq, 0)
                self.tx_devices[device].set_gain(self.tx_gain, 0)
                self.sink_dict[device] = blocks.vector_sink_f()
                self.phase_dict[device] = phase_calc_ccf(
                    self.samp_rate, self.freq)
            for device in self.tx_devices.values():
                self.connect((self.reference_source, 0), (device, 0))

            for device_key in self.rx_devices.keys():
                self.connect(
                    (self.rx_devices[device_key], 0), (self.phase_dict[device_key], 0))
                self.connect((self.reference_source, 0),
                             (self.phase_dict[device_key], 1))
                self.connect(
                    (self.phase_dict[device_key], 0), (self.sink_dict[device_key], 0))
            # Debug options
            # self.sink_list.append(blocks.vector_sink_c())
            #self.connect((device, 0), (self.sink_list[-1], 0))
            # self.sink_list.append(blocks.vector_sink_c())
            #self.connect((self.reference_source, 0), (self.sink_list[-1], 0))
        else:
            ##################################################
            # Blocks
            ##################################################
            self.result = blocks.vector_sink_f(1)
            self.test_source = blocks.vector_source_c(self.test_signal)
            self.block_phase_calc = phase_calc_ccf(
                self.samp_rate, self.freq)

            ##################################################
            # Connections
            ##################################################
            self.connect((self.reference_source, 0), (self.block_phase_calc, 1))
            self.connect((self.test_source, 0), (self.block_phase_calc, 0))
            self.connect((self.block_phase_calc, 0), (self.result, 0))
    def complex_sine(self, steps):
        return np.exp(1j*steps)

    def get_samp_rate(self):
        return self.samp_rate

    def set_samp_rate(self, samp_rate):
        self.samp_rate = samp_rate

    def run(self):
        self.start()
        self.wait()
        if self.device_test:
            data = dict()
            for device_key in self.sink_dict.keys():
                curr_data = self.sink_dict[device_key].data()
                curr_data = curr_data[int(0.2*self.samp_rate):-int(0.2*self.samp_rate)]
                phase_avg = np.average(curr_data)
                if (np.max(curr_data) < phase_avg+self.dphase*0.5) and (np.min(curr_data) > phase_avg-self.dphase*0.5):
                    data[device_key] = phase_avg
                else:
                    print("Error phase not settled")

            #Debug
            # plt.ylim(-1, 1)
            # plt.xlim(self.samp_rate/2.), (self.samp_rate/2.)+1000)
            #for key in data:
            #    plt.plot(data[key])
        return data