#!/usr/bin/env python # # Generate an example RX and TX dataset, with a subsample delay and try to resolve it afterwards # # # The MIT License (MIT) # # Copyright (c) 2017 Matthias P. Braendli # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import matplotlib.pyplot as plt import scipy import scipy.signal import numpy as np def gen_omega(length): if not length % 2 == 0: raise ValueError('Needs an even length array.') halflength = int(length/2) omega = np.zeros(length) omega[0:halflength] = 2*np.pi*np.arange(halflength) omega[halflength+1:] = 2*np.pi*( np.arange(halflength+1, length)-length ) return omega / length def gen_signals(oversample, delay): """Generate a signal that is delayed and a bit noisy. Returns a tuple (original signal, shifted signal)""" iq_file = "/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq" iq_data = np.fromfile(iq_file, np.complex64) # oversampling the input signal doesn't make much of a difference phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data)) # make the signal periodic by duplicating the signal phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq)) noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02, size=len(phase_ref_iq)) phase_ref_iq = phase_ref_iq + noise_iq # exp(-2i pi f) is the Fourier transform of a unity delay. # exp(2i pi f) is a negative delay. bin_frequencies = np.concatenate( (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False), np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False))) phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq)) phase_ref_uc_delayed = np.roll(phase_ref_uc, -delay) phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq)) return phase_ref_iq, phase_ref_delayed def arg_max_corr(a, b): """Calculate fractional delay giving max correlation between a and b""" if len(a.shape) > 1: raise ValueError('Needs a 1-dimensional array.') length = len(a) if not length % 2 == 0: raise ValueError('Needs an even length array.') halflength = int(length/2) if not a.shape == b.shape: raise ValueError('The 2 arrays need to be the same shape') # Start by finding the coarse discretised arg_max coarse_max = np.argmax(np.correlate(a, b, mode='full')) - length+1 omega = gen_omega(length) fft_a = np.fft.fft(a) def correlate_point(tau): rotate_vec = np.exp(1j*tau*omega) rotate_vec[halflength] = np.cos(np.pi*tau) return np.sum((np.fft.ifft(fft_a*rotate_vec)).real*b) start_arg, end_arg = (float(coarse_max)-1, float(coarse_max)+1) max_arg = scipy.optimize.fminbound(lambda tau: -correlate_point(tau), start_arg, end_arg) return np.real(max_arg) def delay_signal(sig, delay): """Apply the delay calculated by arg_max_corr to sig""" frac_delay, int_delay = np.modf(delay) int_delay = int(int_delay) print("Correcting integer delay {}".format(int_delay)) sig = np.roll(sig, int_delay) print("Correcting fractional delay {}".format(frac_delay)) tau = -frac_delay omega = gen_omega(len(sig)) sig_fft = np.fft.fft(sig) rotate_vec = np.exp(1j*tau*omega) rotate_vec[int(len(sig)/2)] = np.cos(np.pi*tau) return np.fft.ifft(sig_fft * rotate_vec) def fftplot(sig): plt.plot(np.abs(np.fft.fftshift(np.fft.fft(sig)))) if __name__ == '__main__': # Add a delay of d/oversample samples to the input signal # by how much to oversample the signal before applying the delay oversample = 8 do_plot = False for d in [2, 7]: a, b = gen_signals(d, oversample) delay = arg_max_corr(a,b) print("{} {}".format(d / oversample, delay)) print("{} {}".format(d / oversample, arg_max_corr(a, delay_signal(b, delay)))) if do_plot: plt.figure() fftplot(a) fftplot(a-b) if do_plot: plt.show()