aboutsummaryrefslogtreecommitdiffstats
path: root/python/gui/dpd/Align.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/gui/dpd/Align.py')
-rw-r--r--python/gui/dpd/Align.py166
1 files changed, 166 insertions, 0 deletions
diff --git a/python/gui/dpd/Align.py b/python/gui/dpd/Align.py
new file mode 100644
index 0000000..1634ec8
--- /dev/null
+++ b/python/gui/dpd/Align.py
@@ -0,0 +1,166 @@
+# -*- coding: utf-8 -*-
+#
+# DPD Computation Engine, utility to do subsample alignment.
+#
+# Copyright (c) 2017 Andreas Steger
+# Copyright (c) 2018 Matthias P. Braendli
+#
+# http://www.opendigitalradio.org
+#
+# This file is part of ODR-DabMod.
+#
+# ODR-DabMod is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# ODR-DabMod is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+import datetime
+import os
+import numpy as np
+from scipy import optimize
+import matplotlib.pyplot as plt
+
+def gen_omega(length):
+ if (length % 2) == 1:
+ raise ValueError("Needs an even length array.")
+
+ halflength = int(length / 2)
+ factor = 2.0 * np.pi / length
+
+ omega = np.zeros(length, dtype=np.float)
+ for i in range(halflength):
+ omega[i] = factor * i
+
+ for i in range(halflength, length):
+ omega[i] = factor * (i - length)
+
+ return omega
+
+
+def subsample_align(sig, ref_sig, plot_location=None):
+ """Do subsample alignment for sig relative to the reference signal
+ ref_sig. The delay between the two must be less than sample
+
+ Returns the aligned signal"""
+
+ n = len(sig)
+ if (n % 2) == 1:
+ raise ValueError("Needs an even length signal.")
+ halflen = int(n / 2)
+
+ fft_sig = np.fft.fft(sig)
+
+ omega = gen_omega(n)
+
+ def correlate_for_delay(tau):
+ # A subsample offset between two signals corresponds, in the frequency
+ # domain, to a linearly increasing phase shift, whose slope
+ # corresponds to the delay.
+ #
+ # Here, we build this phase shift in rotate_vec, and multiply it with
+ # our signal.
+
+ rotate_vec = np.exp(1j * tau * omega)
+ # zero-frequency is rotate_vec[0], so rotate_vec[N/2] is the
+ # bin corresponding to the [-1, 1, -1, 1, ...] time signal, which
+ # is both the maximum positive and negative frequency.
+ # I don't remember why we handle it differently.
+ rotate_vec[halflen] = np.cos(np.pi * tau)
+
+ corr_sig = np.fft.ifft(rotate_vec * fft_sig)
+
+ return -np.abs(np.sum(np.conj(corr_sig) * ref_sig))
+
+ optim_result = optimize.minimize_scalar(correlate_for_delay, bounds=(-1, 1), method='bounded',
+ options={'disp': True})
+
+ if optim_result.success:
+ best_tau = optim_result.x
+
+ if plot_location is not None:
+ corr = np.vectorize(correlate_for_delay)
+ ixs = np.linspace(-1, 1, 100)
+ taus = corr(ixs)
+
+ dt = datetime.datetime.now().isoformat()
+ tau_path = (plot_location + "/" + dt + "_tau.png")
+ plt.plot(ixs, taus)
+ plt.title("Subsample correlation, minimum is best: {}".format(best_tau))
+ plt.savefig(tau_path)
+ plt.close()
+
+ # Prepare rotate_vec = fft_sig with rotated phase
+ rotate_vec = np.exp(1j * best_tau * omega)
+ rotate_vec[halflen] = np.cos(np.pi * best_tau)
+ return np.fft.ifft(rotate_vec * fft_sig).astype(np.complex64)
+ else:
+ # print("Could not optimize: " + optim_result.message)
+ return np.zeros(0, dtype=np.complex64)
+
+def phase_align(sig, ref_sig, plot_location=None):
+ """Do phase alignment for sig relative to the reference signal
+ ref_sig.
+
+ Returns the aligned signal"""
+
+ angle_diff = (np.angle(sig) - np.angle(ref_sig)) % (2. * np.pi)
+
+ real_diffs = np.cos(angle_diff)
+ imag_diffs = np.sin(angle_diff)
+
+ if plot_location is not None:
+ dt = datetime.datetime.now().isoformat()
+ fig_path = plot_location + "/" + dt + "_phase_align.png"
+
+ plt.subplot(511)
+ plt.hist(angle_diff, bins=60, label="Angle Diff")
+ plt.xlabel("Angle")
+ plt.ylabel("Count")
+ plt.legend(loc=4)
+
+ plt.subplot(512)
+ plt.hist(real_diffs, bins=60, label="Real Diff")
+ plt.xlabel("Real Part")
+ plt.ylabel("Count")
+ plt.legend(loc=4)
+
+ plt.subplot(513)
+ plt.hist(imag_diffs, bins=60, label="Imaginary Diff")
+ plt.xlabel("Imaginary Part")
+ plt.ylabel("Count")
+ plt.legend(loc=4)
+
+ plt.subplot(514)
+ plt.plot(np.angle(ref_sig[:128]), label="ref_sig")
+ plt.plot(np.angle(sig[:128]), label="sig")
+ plt.xlabel("Angle")
+ plt.ylabel("Sample")
+ plt.legend(loc=4)
+
+ real_diff = np.median(real_diffs)
+ imag_diff = np.median(imag_diffs)
+
+ angle = np.angle(real_diff + 1j * imag_diff)
+
+ #logging.debug( "Compensating phase by {} rad, {} degree. real median {}, imag median {}".format( angle, angle*180./np.pi, real_diff, imag_diff))
+ sig = sig * np.exp(1j * -angle)
+
+ if plot_location is not None:
+ plt.subplot(515)
+ plt.plot(np.angle(ref_sig[:128]), label="ref_sig")
+ plt.plot(np.angle(sig[:128]), label="sig")
+ plt.xlabel("Angle")
+ plt.ylabel("Sample")
+ plt.legend(loc=4)
+ plt.tight_layout()
+ plt.savefig(fig_path)
+ plt.close()
+
+ return sig