aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorandreas128 <Andreas>2017-02-26 00:22:17 +0000
committerandreas128 <Andreas>2017-02-26 00:22:17 +0000
commit2476ba7e03035caa1db7ad8a0dc6718501680c44 (patch)
tree98dfad3b0976186cbaebcce06bbd04131da0f2d7 /src
parent6e0b2512e45b7a6ca03187814742cb0fe08964cb (diff)
downloadODR-StaticPrecorrection-2476ba7e03035caa1db7ad8a0dc6718501680c44.tar.gz
ODR-StaticPrecorrection-2476ba7e03035caa1db7ad8a0dc6718501680c44.tar.bz2
ODR-StaticPrecorrection-2476ba7e03035caa1db7ad8a0dc6718501680c44.zip
Add sub sample accurate fft lag finder
Diffstat (limited to 'src')
-rw-r--r--src/dab_util.py96
-rw-r--r--src/fft_lag.py165
-rw-r--r--src/visualize.py51
3 files changed, 248 insertions, 64 deletions
diff --git a/src/dab_util.py b/src/dab_util.py
index aa2030f..8ab1e03 100644
--- a/src/dab_util.py
+++ b/src/dab_util.py
@@ -1,6 +1,7 @@
import numpy as np
import scipy
import matplotlib.pyplot as plt
+import fftconvolve
c = {
"bw":1536000
@@ -41,67 +42,34 @@ def crop_signal(signal, n_window = 1000, n_zeros = 120000, debug = False):
signal = signal[max(0,idx_start - n_zeros): min(idx_end + n_zeros, signal.shape[0] -1)]
return signal
-def lagcorr(x,y,lag=None,verbose=False):
- '''Compute lead-lag correlations between 2 time series.
-
- <x>,<y>: 1-D time series.
- <lag>: lag option, could take different forms of <lag>:
- if 0 or None, compute ordinary correlation and p-value;
- if positive integer, compute lagged correlation with lag
- upto <lag>;
- if negative integer, compute lead correlation with lead
- upto <-lag>;
- if pass in an list or tuple or array of integers, compute
- lead/lag correlations at different leads/lags.
-
- Note: when talking about lead/lag, uses <y> as a reference.
- Therefore positive lag means <x> lags <y> by <lag>, computation is
- done by shifting <x> to the left hand side by <lag> with respect to
- <y>.
- Similarly negative lag means <x> leads <y> by <lag>, computation is
- done by shifting <x> to the right hand side by <lag> with respect to
- <y>.
-
- Return <result>: a (n*2) array, with 1st column the correlation
- coefficients, 2nd column correpsonding p values.
-
- Currently only works for 1-D arrays.
- '''
-
- import numpy
- from scipy.stats import pearsonr
-
- if len(x)!=len(y):
- raise Exception('Input variables of different lengths.')
-
- #--------Unify types of <lag>-------------
- if numpy.isscalar(lag):
- if abs(lag)>=len(x):
- raise Exception('Maximum lag equal or larger than array.')
- if lag<0:
- lag=-numpy.arange(abs(lag)+1)
- elif lag==0:
- lag=[0,]
- else:
- lag=numpy.arange(lag+1)
- elif lag is None:
- lag=[0,]
- else:
- lag=numpy.asarray(lag)
-
- #-------Loop over lags---------------------
- result=[]
- if verbose:
- print '\n#<lagcorr>: Computing lagged-correlations at lags:',lag
-
- for ii in lag:
- if ii<0:
- result.append(pearsonr(x[:ii],y[-ii:]))
- elif ii==0:
- result.append(pearsonr(x,y))
- elif ii>0:
- result.append(pearsonr(x[ii:],y[:-ii]))
-
- result=numpy.asarray(result)
-
- return result
+#def fftlag(signal_original, signal_rec):
+# """
+# Efficient way to find lag between two signals
+# Args:
+# signal_original: The signal that has been sent
+# signal_rec: The signal that has been recored
+# """
+# c = np.flipud(scipy.signal.fftconvolve(signal_original,np.flipud(signal_rec)))
+# #plt.plot(c)
+# return np.argmax(c) - signal_original.shape[0] + 1
+
+def fftlag(signal_original, signal_rec, n_upsampling = 1):
+ """
+ Efficient way to find lag between two signals
+ Args:
+ signal_original: The signal that has been sent
+ signal_rec: The signal that has been recored
+ """
+ c = np.flipud(fftconvolve.fftconvolve(signal_original,np.flipud(signal_rec), n_upsampling))
+ #plt.plot(c)
+ return (np.argmax(c) - signal_original.shape[0] + 1)
+
+def get_amp_ratio(ampl_1, ampl_2, a_out_abs, a_in_abs):
+ idxs = (a_in_abs > ampl_1) & (a_in_abs < ampl_2)
+ ratio = a_out_abs[idxs] / a_in_abs[idxs]
+ return ratio.mean(), ratio.var()
+
+def get_phase(ampl_1, ampl_2, a_out, a_in):
+ idxs = (np.abs(a_in) > ampl_1) & (np.abs(a_in) < ampl_2)
+ ratio = np.angle(a_out[idxs], deg=True) - np.angle(a_in[idxs], deg=True)
+ return ratio.mean(), ratio.var()
diff --git a/src/fft_lag.py b/src/fft_lag.py
new file mode 100644
index 0000000..2d8d733
--- /dev/null
+++ b/src/fft_lag.py
@@ -0,0 +1,165 @@
+import numpy as np
+from numpy.fft import rfftn, irfftn
+from scipy.fftpack import (fft, ifft, ifftshift, fft2, ifft2, fftn,
+ ifftn, fftfreq)
+from scipy._lib._version import NumpyVersion
+from scipy import signal
+
+########################################
+##### Test functions
+########################################
+def create_triangular_chirp(f = 0.2, periods = 100):
+ #f ... per sample
+ t_max = periods / float(f)
+ t = np.arange(t_max)
+ sig = signal.chirp(t, 0, t_max, f)
+ triangle = np.linspace(0, t_max, num = t_max) / t_max
+ sig = np.multiply(sig, triangle)
+ return sig
+
+def add_delay_and_padding(sig, delay=0, padding=0):
+ n_sig = sig.shape[0]
+ n_total = n_sig + delay + padding
+ ret = np.zeros((n_total), dtype=sig.dtype)
+ ret[delay:delay + n_sig] = sig
+ return ret
+
+def create_test_signal(f=0.05, periods=100, delay=0, padding=0):
+ return add_delay_and_padding(create_triangular_chirp(f=f, periods=periods),
+ delay=delay, padding=padding)
+
+def visualize_test_signal(f=0.05, periods=100, delay=10, padding=100):
+ plt.plot(create_test_signal(f=f, periods=periods, delay=delay, padding=padding))
+
+def down_sample(sig, n_every):
+ return sig[::n_every]
+
+
+
+_rfft_mt_safe = (NumpyVersion(np.__version__) >= '1.9.0.dev-e24486e')
+
+def _next_regular(target):
+ """
+ Find the next regular number greater than or equal to target.
+ Regular numbers are composites of the prime factors 2, 3, and 5.
+ Also known as 5-smooth numbers or Hamming numbers, these are the optimal
+ size for inputs to FFTPACK.
+ Target must be a positive integer.
+ """
+ if target <= 6:
+ return target
+
+ # Quickly check if it's already a power of 2
+ if not (target & (target-1)):
+ return target
+
+ match = float('inf') # Anything found will be smaller
+ p5 = 1
+ while p5 < target:
+ p35 = p5
+ while p35 < target:
+ # Ceiling integer division, avoiding conversion to float
+ # (quotient = ceil(target / p35))
+ quotient = -(-target // p35)
+
+ # Quickly find next power of 2 >= quotient
+ try:
+ p2 = 2**((quotient - 1).bit_length())
+ except AttributeError:
+ # Fallback for Python <2.7
+ p2 = 2**(len(bin(quotient - 1)) - 2)
+
+ N = p2 * p35
+ if N == target:
+ return N
+ elif N < match:
+ match = N
+ p35 *= 3
+ if p35 == target:
+ return p35
+ if p35 < match:
+ match = p35
+ p5 *= 5
+ if p5 == target:
+ return p5
+ if p5 < match:
+ match = p5
+ return match
+
+def fft_lag(s1, s2, n_up = 1, debug=False):
+ if debug:
+ import matplotlib.pyplot as plt
+ s1 = np.flipud(s1) #mult becomes convolution -> filp to get correlation
+
+ sh1 = np.array(s1.shape)
+ sh2 = np.array(s2.shape)
+ complex_result = (np.issubdtype(s1.dtype, np.complex) or
+ np.issubdtype(s2.dtype, np.complex))
+ shape = (sh1 + sh2 - 1)
+
+ fshape = [_next_regular(int(d)) for d in shape]
+ fslice = tuple([slice(0, int(sz)) for sz in shape])
+
+ def upsample_fft(s_fft, n_up):
+ n = s_fft.shape[0]
+ dtype = s_fft.dtype
+ ret = None
+ if n % 2 == 0:
+ ret = np.zeros((n*n_up), dtype=dtype)
+ ret[:n/2] = s_fft[0:n/2]
+ ret[-n/2:] = s_fft[-n/2:]
+ else:
+ ret = np.zeros((n*n_up), dtype=dtype)
+ ret[:n/2] = s_fft[0:n/2]
+ ret[n/2+1] = s_fft[n/2+1] / 2.
+ ret[-(n/2+1)] = s_fft[n/2+1] / 2.
+ ret[-n/2:] = s_fft[-n/2:]
+ ret = ret * n_up
+ return ret
+
+ s1_fft = np.fft.fftn(s1, fshape)
+ s2_fft = np.fft.fftn(s2, fshape)
+ s1_s2_fft = upsample_fft(s1_fft * s2_fft, n_up)
+ s1_s2 = np.fft.ifftn(s1_s2_fft)
+
+ delay = np.argmax(s1_s2) / float(n_up) - sh1[0] + 1
+ #peak of correlation - size of original signal (robust against padding at the end)
+
+ if debug:
+ plt.subplot(411); plt.plot(s1_fft)
+ plt.subplot(412); plt.plot(s2_fft)
+ plt.subplot(413); plt.plot(s1_s2_fft)
+ plt.subplot(414); plt.plot(s1_s2)
+ plt.show()
+ print(s1_s2.shape, s1_fft.shape, s2_fft.shape, fshape, fslice)
+
+
+ #return np.argmax(s1_s2)/float(n_up)
+ #return np.argmax(s1_s2)/float(n_up) - s2.shape[0] + 1
+ return delay
+
+def fft_lag_random_test(n_tests=1000):
+ def r():
+ return np.random.randint(0, 1000)
+ def rand(n):
+ return [np.random.randint(0, 1000) for i in range(n)]
+
+ for i in range(n_tests):
+ debug = (i == 0)
+ d1, d2, p1, p2 = rand(4)
+ n_down = 5
+ n_up = 32
+ sig1 = down_sample(create_test_signal(delay=d1, padding=p1), n_down)
+ sig2 = down_sample(create_test_signal(delay=d2, padding=p2), n_down)
+
+ d1, d2, p1, p2 = [x/float(n_down) for x in [d1, d2, p1, p2]]
+ delay = d2 - d1
+ delay_meas = fft_lag(sig1, sig2, n_up=n_up, debug = debug)
+ tol = 1./n_up
+ error = abs(delay - delay_meas)
+ assert(error < tol)
+ print("%d tests within tolerance" % n_tests)
+
+
+if __name__ == "__main__":
+ fft_lag_random_test()
diff --git a/src/visualize.py b/src/visualize.py
new file mode 100644
index 0000000..3762911
--- /dev/null
+++ b/src/visualize.py
@@ -0,0 +1,51 @@
+import matplotlib.pyplot as plt
+from scipy import signal
+
+
+def visualize_sync_signal(s1, s2, delay, offset=0, window_size = 20, over_sampling = 10):
+ os = over_sampling
+ s1_show = signal.resample(s1, s1.shape[0]*os)
+ s2_show = signal.resample(s2, s2.shape[0]*os)
+ if(delay < 0):
+ print("negativ delay", delay)
+ delay_abs = abs(delay)
+ s1_idx_start = offset + delay_abs
+ s1_idx_end = offset + window_size + delay_abs
+ s2_idx_start = offset
+ s2_idx_end = offset + window_size
+
+ s1_idx_start = int(s1_idx_start * os)
+ s1_idx_end = int(s1_idx_end * os)
+ s2_idx_start = int(s2_idx_start * os)
+ s2_idx_end = int(s2_idx_end * os)
+
+ plt.plot(1 + s1_show[s1_idx_start:s1_idx_end], label = "s1")
+ plt.plot(-1 + s2_show[s2_idx_start:s2_idx_end], label = "s2")
+ plt.legend()
+ plt.show()
+ elif(delay >= 0):
+ print("positive delay", delay)
+ s1_idx_start = offset
+ s1_idx_end = offset + window_size
+ s2_idx_start = offset + delay
+ s2_idx_end = offset + window_size + delay
+
+ s1_idx_start = int(s1_idx_start * os)
+ s1_idx_end = int(s1_idx_end * os)
+ s2_idx_start = int(s2_idx_start * os)
+ s2_idx_end = int(s2_idx_end * os)
+
+ plt.plot(1 + s1_show[s1_idx_start:s1_idx_end], label = "s1")
+ plt.plot(-1 + s2_show[s2_idx_start:s2_idx_end], label = "s2")
+ plt.legend()
+ plt.show()
+
+def visualize_signals(s1, s2, offset=0, window_size = 20):
+ s1_idx_start = offset
+ s1_idx_end = offset + window_size
+ s2_idx_start = offset
+ s2_idx_end = offset + window_size
+
+ plt.subplot(211); plt.plot(s1[s1_idx_start:s1_idx_end], label = "s1"), plt.legend()
+ plt.subplot(212); plt.plot(s2[s2_idx_start:s2_idx_end], label = "s2"), plt.legend()
+ plt.show()