1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
#!/usr/bin/env python
from gnuradio import blocks
from gnuradio import gr
from gnuradio import uhd
from usrptest import phase_calc_ccf
from gnuradio.uhd.uhd_app import UHDApp
import numpy as np
class selftest_fg(gr.top_block):
def __init__(self, freq, samp_rate, dphase, devices=list()):
gr.top_block.__init__(self, "Generate Signal extract phase")
##################################################
# Variables
##################################################
self.samp_rate = samp_rate
self.freq = 10e3
self.devices = devices
self.dphase = dphase
self.tx_gain = 50
self.rx_gain = 50
self.center_freq = freq
self.omega = 2*np.pi*self.freq
self.steps = np.arange(
self.samp_rate)*float(self.omega)/float(self.samp_rate)
self.reference = self.complex_sine(self.steps)
self.test_signal = self.complex_sine(self.steps+0.5*np.pi)
self.device_test = False
##################################################
# Block dicts
##################################################
self.rx_devices = dict()
self.tx_devices = dict()
self.sink_dict = dict()
self.phase_dict = dict()
self.reference_source = blocks.vector_source_c(self.reference)
if len(self.devices):
##################################################
# Devices
##################################################
self.device_test = True
#To reuse existing setup_usrp() command
for device in self.devices:
# Create and configure all devices
self.rx_devices[device] = uhd.usrp_source(
device, uhd.stream_args(
cpu_format="fc32", channel=range(1)))
self.tx_devices[device] = uhd.usrp_sink(
device, uhd.stream_args(
cpu_format="fc32", channel=range(1)))
self.rx_devices[device].set_samp_rate(self.samp_rate)
self.rx_devices[device].set_center_freq(self.center_freq, 0)
self.rx_devices[device].set_gain(self.rx_gain, 0)
self.tx_devices[device].set_samp_rate(self.samp_rate)
self.tx_devices[device].set_center_freq(self.center_freq, 0)
self.tx_devices[device].set_gain(self.tx_gain, 0)
self.sink_dict[device] = blocks.vector_sink_f()
self.phase_dict[device] = phase_calc_ccf(
self.samp_rate, self.freq)
for device in self.tx_devices.values():
self.connect((self.reference_source, 0), (device, 0))
for device_key in self.rx_devices.keys():
self.connect(
(self.rx_devices[device_key], 0), (self.phase_dict[device_key], 0))
self.connect((self.reference_source, 0),
(self.phase_dict[device_key], 1))
self.connect(
(self.phase_dict[device_key], 0), (self.sink_dict[device_key], 0))
# Debug options
# self.sink_list.append(blocks.vector_sink_c())
#self.connect((device, 0), (self.sink_list[-1], 0))
# self.sink_list.append(blocks.vector_sink_c())
#self.connect((self.reference_source, 0), (self.sink_list[-1], 0))
else:
##################################################
# Blocks
##################################################
self.result = blocks.vector_sink_f(1)
self.test_source = blocks.vector_source_c(self.test_signal)
self.block_phase_calc = phase_calc_ccf(
self.samp_rate, self.freq)
##################################################
# Connections
##################################################
self.connect((self.reference_source, 0), (self.block_phase_calc, 1))
self.connect((self.test_source, 0), (self.block_phase_calc, 0))
self.connect((self.block_phase_calc, 0), (self.result, 0))
def complex_sine(self, steps):
return np.exp(1j*steps)
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
def run(self):
self.start()
self.wait()
if self.device_test:
data = dict()
for device_key in self.sink_dict.keys():
curr_data = self.sink_dict[device_key].data()
curr_data = curr_data[int(0.2*self.samp_rate):-int(0.2*self.samp_rate)]
phase_avg = np.average(curr_data)
if (np.max(curr_data) < phase_avg+self.dphase*0.5) and (np.min(curr_data) > phase_avg-self.dphase*0.5):
data[device_key] = phase_avg
else:
print("Error phase not settled")
#Debug
# plt.ylim(-1, 1)
# plt.xlim(self.samp_rate/2.), (self.samp_rate/2.)+1000)
#for key in data:
# plt.plot(data[key])
return data
|