aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-02-04 18:26:35 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-02-04 18:26:35 +0100
commitec992e8e61172bacf515483c8a8784335ca8eb5c (patch)
treeed701692d4ae93caffc2ce64bd79a9a21aa5b7e0
parent936db3485eabc692e29f4249349fc351aad18675 (diff)
downloadodr-dpd-ec992e8e61172bacf515483c8a8784335ca8eb5c.tar.gz
odr-dpd-ec992e8e61172bacf515483c8a8784335ca8eb5c.tar.bz2
odr-dpd-ec992e8e61172bacf515483c8a8784335ca8eb5c.zip
Replace code and calculate fractional delay instead
-rwxr-xr-xalign/GenerateExampleTxRxIQ.py142
1 files changed, 54 insertions, 88 deletions
diff --git a/align/GenerateExampleTxRxIQ.py b/align/GenerateExampleTxRxIQ.py
index dc8b572..448b859 100755
--- a/align/GenerateExampleTxRxIQ.py
+++ b/align/GenerateExampleTxRxIQ.py
@@ -26,6 +26,7 @@
# SOFTWARE.
import matplotlib.pyplot as plt
+import scipy
import scipy.signal
import numpy as np
@@ -35,118 +36,83 @@ import numpy as np
do_integer_compensation = 0
# by how much to oversample the signal before applying the delay
-oversample = 16
+oversample = 8
# Add a delay of delay/oversample samples to the input signal
-delay = 7
+#delay = 7
-print("Expecting a delay of {} samples".format(delay/oversample))
+do_plot = False
-## Generate signal
+def gen_signals(delay):
+ ## Generate signal
-iq_file = "/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq"
+ iq_file = "/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq"
-iq_data = np.fromfile(iq_file, np.complex64)
+ iq_data = np.fromfile(iq_file, np.complex64)
-# oversampling the input signal doesn't make much of a difference
-phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data))
+ # oversampling the input signal doesn't make much of a difference
+ phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data))
-# make the signal periodic by duplicating the signal
-phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq))
+ # make the signal periodic by duplicating the signal
+ phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq))
-noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02,
- size=len(phase_ref_iq))
+ noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02,
+ size=len(phase_ref_iq))
-phase_ref_iq = phase_ref_iq + noise_iq
+ phase_ref_iq = phase_ref_iq + noise_iq
-# exp(-2i pi f) is the Fourier transform of a unity delay.
-# exp(2i pi f) is a negative delay.
-bin_frequencies = np.concatenate(
- (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False),
- np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False)))
+ # exp(-2i pi f) is the Fourier transform of a unity delay.
+ # exp(2i pi f) is a negative delay.
+ bin_frequencies = np.concatenate(
+ (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False),
+ np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False)))
-phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq))
+ phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq))
+ phase_ref_uc_delayed = np.roll(phase_ref_uc, -delay)
-phase_ref_uc_delayed = np.roll(phase_ref_uc, delay)
+ phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq))
+ return phase_ref_iq, phase_ref_delayed
-phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq))
+def arg_max_corr(a, b):
+ """Calculate fractional delay giving max correlation between a and b"""
+ if len(a.shape) > 1:
+ raise ValueError('Needs a 1-dimensional array.')
-## Integer delay
-corr_begin_ix = -32
-corr_end_ix = 32
+ length = len(a)
+ if not length % 2 == 0:
+ raise ValueError('Needs an even length array.')
+ halflength = int(length/2)
-corr = [np.abs(np.corrcoef(phase_ref_delayed, np.roll(phase_ref_iq, i))[0,1])
- for i in range(corr_begin_ix, corr_end_ix)]
-# TODO check for negative real correlation peak
-if do_integer_compensation:
- delay_in = np.argmax(corr) + corr_begin_ix
-else:
- delay_in = 0
+ if not a.shape == b.shape:
+ raise ValueError('The 2 arrays need to be the same shape')
-phase_ref_int_delay_removed = np.roll(phase_ref_delayed, -delay_in)
+ # Start by finding the coarse discretised arg_max
+ coarse_max = np.argmax(np.correlate(a, b, mode='full')) - length+1
-print("Integer delay corrected: {}".format(delay_in))
+ omega = np.zeros(length)
+ omega[0:halflength] = (2*np.pi*np.arange(halflength))/length
+ omega[halflength+1:] = (2*np.pi*
+ (np.arange(halflength+1, length)-length))/length
-## Fractional delay
-signal_fft = np.fft.fft(phase_ref_int_delay_removed)
-reference_fft = np.fft.fft(phase_ref_iq)
+ fft_a = np.fft.fft(a)
-# rotate each bin backwards with the phase of the reference. As we have already resolved the
-# integer delay, we should find at most one 2*pi wrapping.
-u = signal_fft * np.conj(reference_fft)
+ def correlate_point(tau):
+ rotate_vec = np.exp(1j*tau*omega)
+ rotate_vec[halflength] = np.cos(np.pi*tau)
+ return np.sum((np.fft.ifft(fft_a*rotate_vec)).real*b)
-# the phase signal will still wrap around, and will have values between -pi/4 and pi/4
-phase_wrapping = np.angle(u)
+ start_arg, end_arg = (float(coarse_max)-1, float(coarse_max)+1)
-unwrap_with_deriv_integrate = True
-if unwrap_with_deriv_integrate:
- # to unwrap, take the derivative, remove peaks, integrate
- phase_deriv = phase_wrapping - np.roll(phase_wrapping, 1)
+ max_arg = scipy.optimize.fminbound(lambda tau: -correlate_point(tau),
+ start_arg, end_arg)
- def filter_phase_deriv(p):
- if np.abs(p) < 0.5:
- return p
- else:
- return 0
+ return np.real(max_arg)
- phase_deriv_nopeaks = [filter_phase_deriv(p) for p in phase_deriv]
- phase_unwrapped = np.cumsum(phase_deriv_nopeaks)
-else:
- # doesn't always work, sometimes there are smaller jumps in phase
- phase_unwrapped = np.mod(phase_wrapping + np.pi/2, np.ones(len(phase_wrapping)) * np.pi / 2)
+for d in [2, 5, 9, 15, 34, 120]:
+ a, b = gen_signals(d)
+ print("{} {}".format(d / oversample, arg_max_corr(a,b)))
-
-# Find the slope using a linear regression
-slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(phase_unwrapped,range(len(phase_unwrapped)))
-
-if p_value < 0.05:
- frac_delay = slope / len(phase_unwrapped)
- frac_rotate = intercept / len(phase_unwrapped)
- print("Applying subsample correction: {} {}".format(frac_delay, frac_rotate))
- print(slope, intercept, r_value, p_value, std_err)
-else:
- print("Skipping subsample correction")
- print(slope, intercept, r_value, p_value, std_err)
- frac_delay = None
-
-plt.figure()
-plt.plot(phase_wrapping)
-plt.plot(phase_unwrapped)
-
-
-if frac_delay:
- fine_shift_fft = np.exp((0+2j * np.pi * frac_delay) * bin_frequencies) * np.exp(0+2j * np.pi * frac_rotate)
- sig_delay_removed_fft = signal_fft * fine_shift_fft
-
- sig_delay_removed = np.fft.ifft(sig_delay_removed_fft)
-
- plt.figure()
- plt.plot(np.angle(np.fft.fftshift(fine_shift_fft)))
-
- plt.figure()
- plt.plot(np.abs(np.fft.fftshift(np.fft.fft(phase_ref_iq))))
- plt.plot(np.abs(np.fft.fftshift(np.fft.fft(sig_delay_removed-phase_ref_iq))))
-
-plt.show()
+if do_plot:
+ plt.show()