aboutsummaryrefslogtreecommitdiffstats
path: root/tools/gr-usrptest/python/flowgraphs/selftest_fg.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/gr-usrptest/python/flowgraphs/selftest_fg.py')
-rw-r--r--tools/gr-usrptest/python/flowgraphs/selftest_fg.py122
1 files changed, 122 insertions, 0 deletions
diff --git a/tools/gr-usrptest/python/flowgraphs/selftest_fg.py b/tools/gr-usrptest/python/flowgraphs/selftest_fg.py
new file mode 100644
index 000000000..cdfc35e74
--- /dev/null
+++ b/tools/gr-usrptest/python/flowgraphs/selftest_fg.py
@@ -0,0 +1,122 @@
+#!/usr/bin/env python
+
+from gnuradio import blocks
+from gnuradio import gr
+from gnuradio import uhd
+from usrptest import phase_calc_ccf
+from gnuradio.uhd.uhd_app import UHDApp
+import numpy as np
+
+class selftest_fg(gr.top_block):
+
+ def __init__(self, freq, samp_rate, dphase, devices=list()):
+ gr.top_block.__init__(self, "Generate Signal extract phase")
+
+ ##################################################
+ # Variables
+ ##################################################
+ self.samp_rate = samp_rate
+ self.freq = 10e3
+ self.devices = devices
+ self.dphase = dphase
+ self.tx_gain = 50
+ self.rx_gain = 50
+ self.center_freq = freq
+ self.omega = 2*np.pi*self.freq
+ self.steps = np.arange(
+ self.samp_rate)*float(self.omega)/float(self.samp_rate)
+ self.reference = self.complex_sine(self.steps)
+ self.test_signal = self.complex_sine(self.steps+0.5*np.pi)
+ self.device_test = False
+
+ ##################################################
+ # Block dicts
+ ##################################################
+ self.rx_devices = dict()
+ self.tx_devices = dict()
+ self.sink_dict = dict()
+ self.phase_dict = dict()
+ self.reference_source = blocks.vector_source_c(self.reference)
+
+ if len(self.devices):
+ ##################################################
+ # Devices
+ ##################################################
+ self.device_test = True
+ #To reuse existing setup_usrp() command
+ for device in self.devices:
+ # Create and configure all devices
+ self.rx_devices[device] = uhd.usrp_source(
+ device, uhd.stream_args(
+ cpu_format="fc32", channel=range(1)))
+ self.tx_devices[device] = uhd.usrp_sink(
+ device, uhd.stream_args(
+ cpu_format="fc32", channel=range(1)))
+ self.rx_devices[device].set_samp_rate(self.samp_rate)
+ self.rx_devices[device].set_center_freq(self.center_freq, 0)
+ self.rx_devices[device].set_gain(self.rx_gain, 0)
+ self.tx_devices[device].set_samp_rate(self.samp_rate)
+ self.tx_devices[device].set_center_freq(self.center_freq, 0)
+ self.tx_devices[device].set_gain(self.tx_gain, 0)
+ self.sink_dict[device] = blocks.vector_sink_f()
+ self.phase_dict[device] = phase_calc_ccf(
+ self.samp_rate, self.freq)
+ for device in self.tx_devices.values():
+ self.connect((self.reference_source, 0), (device, 0))
+
+ for device_key in self.rx_devices.keys():
+ self.connect(
+ (self.rx_devices[device_key], 0), (self.phase_dict[device_key], 0))
+ self.connect((self.reference_source, 0),
+ (self.phase_dict[device_key], 1))
+ self.connect(
+ (self.phase_dict[device_key], 0), (self.sink_dict[device_key], 0))
+ # Debug options
+ # self.sink_list.append(blocks.vector_sink_c())
+ #self.connect((device, 0), (self.sink_list[-1], 0))
+ # self.sink_list.append(blocks.vector_sink_c())
+ #self.connect((self.reference_source, 0), (self.sink_list[-1], 0))
+ else:
+ ##################################################
+ # Blocks
+ ##################################################
+ self.result = blocks.vector_sink_f(1)
+ self.test_source = blocks.vector_source_c(self.test_signal)
+ self.block_phase_calc = phase_calc_ccf(
+ self.samp_rate, self.freq)
+
+ ##################################################
+ # Connections
+ ##################################################
+ self.connect((self.reference_source, 0), (self.block_phase_calc, 1))
+ self.connect((self.test_source, 0), (self.block_phase_calc, 0))
+ self.connect((self.block_phase_calc, 0), (self.result, 0))
+ def complex_sine(self, steps):
+ return np.exp(1j*steps)
+
+ def get_samp_rate(self):
+ return self.samp_rate
+
+ def set_samp_rate(self, samp_rate):
+ self.samp_rate = samp_rate
+
+ def run(self):
+ self.start()
+ self.wait()
+ if self.device_test:
+ data = dict()
+ for device_key in self.sink_dict.keys():
+ curr_data = self.sink_dict[device_key].data()
+ curr_data = curr_data[int(0.2*self.samp_rate):-int(0.2*self.samp_rate)]
+ phase_avg = np.average(curr_data)
+ if (np.max(curr_data) < phase_avg+self.dphase*0.5) and (np.min(curr_data) > phase_avg-self.dphase*0.5):
+ data[device_key] = phase_avg
+ else:
+ print("Error phase not settled")
+
+ #Debug
+ # plt.ylim(-1, 1)
+ # plt.xlim(self.samp_rate/2.), (self.samp_rate/2.)+1000)
+ #for key in data:
+ # plt.plot(data[key])
+ return data