diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-02-04 18:26:35 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-02-04 18:26:35 +0100 |
commit | ec992e8e61172bacf515483c8a8784335ca8eb5c (patch) | |
tree | ed701692d4ae93caffc2ce64bd79a9a21aa5b7e0 | |
parent | 936db3485eabc692e29f4249349fc351aad18675 (diff) | |
download | odr-dpd-ec992e8e61172bacf515483c8a8784335ca8eb5c.tar.gz odr-dpd-ec992e8e61172bacf515483c8a8784335ca8eb5c.tar.bz2 odr-dpd-ec992e8e61172bacf515483c8a8784335ca8eb5c.zip |
Replace code and calculate fractional delay instead
-rwxr-xr-x | align/GenerateExampleTxRxIQ.py | 142 |
1 files changed, 54 insertions, 88 deletions
diff --git a/align/GenerateExampleTxRxIQ.py b/align/GenerateExampleTxRxIQ.py index dc8b572..448b859 100755 --- a/align/GenerateExampleTxRxIQ.py +++ b/align/GenerateExampleTxRxIQ.py @@ -26,6 +26,7 @@ # SOFTWARE. import matplotlib.pyplot as plt +import scipy import scipy.signal import numpy as np @@ -35,118 +36,83 @@ import numpy as np do_integer_compensation = 0 # by how much to oversample the signal before applying the delay -oversample = 16 +oversample = 8 # Add a delay of delay/oversample samples to the input signal -delay = 7 +#delay = 7 -print("Expecting a delay of {} samples".format(delay/oversample)) +do_plot = False -## Generate signal +def gen_signals(delay): + ## Generate signal -iq_file = "/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq" + iq_file = "/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq" -iq_data = np.fromfile(iq_file, np.complex64) + iq_data = np.fromfile(iq_file, np.complex64) -# oversampling the input signal doesn't make much of a difference -phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data)) + # oversampling the input signal doesn't make much of a difference + phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data)) -# make the signal periodic by duplicating the signal -phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq)) + # make the signal periodic by duplicating the signal + phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq)) -noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02, - size=len(phase_ref_iq)) + noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02, + size=len(phase_ref_iq)) -phase_ref_iq = phase_ref_iq + noise_iq + phase_ref_iq = phase_ref_iq + noise_iq -# exp(-2i pi f) is the Fourier transform of a unity delay. -# exp(2i pi f) is a negative delay. -bin_frequencies = np.concatenate( - (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False), - np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False))) + # exp(-2i pi f) is the Fourier transform of a unity delay. + # exp(2i pi f) is a negative delay. + bin_frequencies = np.concatenate( + (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False), + np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False))) -phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq)) + phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq)) + phase_ref_uc_delayed = np.roll(phase_ref_uc, -delay) -phase_ref_uc_delayed = np.roll(phase_ref_uc, delay) + phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq)) + return phase_ref_iq, phase_ref_delayed -phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq)) +def arg_max_corr(a, b): + """Calculate fractional delay giving max correlation between a and b""" + if len(a.shape) > 1: + raise ValueError('Needs a 1-dimensional array.') -## Integer delay -corr_begin_ix = -32 -corr_end_ix = 32 + length = len(a) + if not length % 2 == 0: + raise ValueError('Needs an even length array.') + halflength = int(length/2) -corr = [np.abs(np.corrcoef(phase_ref_delayed, np.roll(phase_ref_iq, i))[0,1]) - for i in range(corr_begin_ix, corr_end_ix)] -# TODO check for negative real correlation peak -if do_integer_compensation: - delay_in = np.argmax(corr) + corr_begin_ix -else: - delay_in = 0 + if not a.shape == b.shape: + raise ValueError('The 2 arrays need to be the same shape') -phase_ref_int_delay_removed = np.roll(phase_ref_delayed, -delay_in) + # Start by finding the coarse discretised arg_max + coarse_max = np.argmax(np.correlate(a, b, mode='full')) - length+1 -print("Integer delay corrected: {}".format(delay_in)) + omega = np.zeros(length) + omega[0:halflength] = (2*np.pi*np.arange(halflength))/length + omega[halflength+1:] = (2*np.pi* + (np.arange(halflength+1, length)-length))/length -## Fractional delay -signal_fft = np.fft.fft(phase_ref_int_delay_removed) -reference_fft = np.fft.fft(phase_ref_iq) + fft_a = np.fft.fft(a) -# rotate each bin backwards with the phase of the reference. As we have already resolved the -# integer delay, we should find at most one 2*pi wrapping. -u = signal_fft * np.conj(reference_fft) + def correlate_point(tau): + rotate_vec = np.exp(1j*tau*omega) + rotate_vec[halflength] = np.cos(np.pi*tau) + return np.sum((np.fft.ifft(fft_a*rotate_vec)).real*b) -# the phase signal will still wrap around, and will have values between -pi/4 and pi/4 -phase_wrapping = np.angle(u) + start_arg, end_arg = (float(coarse_max)-1, float(coarse_max)+1) -unwrap_with_deriv_integrate = True -if unwrap_with_deriv_integrate: - # to unwrap, take the derivative, remove peaks, integrate - phase_deriv = phase_wrapping - np.roll(phase_wrapping, 1) + max_arg = scipy.optimize.fminbound(lambda tau: -correlate_point(tau), + start_arg, end_arg) - def filter_phase_deriv(p): - if np.abs(p) < 0.5: - return p - else: - return 0 + return np.real(max_arg) - phase_deriv_nopeaks = [filter_phase_deriv(p) for p in phase_deriv] - phase_unwrapped = np.cumsum(phase_deriv_nopeaks) -else: - # doesn't always work, sometimes there are smaller jumps in phase - phase_unwrapped = np.mod(phase_wrapping + np.pi/2, np.ones(len(phase_wrapping)) * np.pi / 2) +for d in [2, 5, 9, 15, 34, 120]: + a, b = gen_signals(d) + print("{} {}".format(d / oversample, arg_max_corr(a,b))) - -# Find the slope using a linear regression -slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(phase_unwrapped,range(len(phase_unwrapped))) - -if p_value < 0.05: - frac_delay = slope / len(phase_unwrapped) - frac_rotate = intercept / len(phase_unwrapped) - print("Applying subsample correction: {} {}".format(frac_delay, frac_rotate)) - print(slope, intercept, r_value, p_value, std_err) -else: - print("Skipping subsample correction") - print(slope, intercept, r_value, p_value, std_err) - frac_delay = None - -plt.figure() -plt.plot(phase_wrapping) -plt.plot(phase_unwrapped) - - -if frac_delay: - fine_shift_fft = np.exp((0+2j * np.pi * frac_delay) * bin_frequencies) * np.exp(0+2j * np.pi * frac_rotate) - sig_delay_removed_fft = signal_fft * fine_shift_fft - - sig_delay_removed = np.fft.ifft(sig_delay_removed_fft) - - plt.figure() - plt.plot(np.angle(np.fft.fftshift(fine_shift_fft))) - - plt.figure() - plt.plot(np.abs(np.fft.fftshift(np.fft.fft(phase_ref_iq)))) - plt.plot(np.abs(np.fft.fftshift(np.fft.fft(sig_delay_removed-phase_ref_iq)))) - -plt.show() +if do_plot: + plt.show() |