aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-02-04 17:59:20 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-02-04 17:59:20 +0100
commit936db3485eabc692e29f4249349fc351aad18675 (patch)
tree05583c1846b706bf3867cdf38a8cc5a5fd1f121c
parentce8ae723cf496b698a0f6395e43bc3b993081d5b (diff)
downloadodr-dpd-936db3485eabc692e29f4249349fc351aad18675.tar.gz
odr-dpd-936db3485eabc692e29f4249349fc351aad18675.tar.bz2
odr-dpd-936db3485eabc692e29f4249349fc351aad18675.zip
Replace jupyter notebook by script file
-rw-r--r--align/GenerateExampleTxRxIQ.ipynb255
-rwxr-xr-xalign/GenerateExampleTxRxIQ.py152
2 files changed, 152 insertions, 255 deletions
diff --git a/align/GenerateExampleTxRxIQ.ipynb b/align/GenerateExampleTxRxIQ.ipynb
deleted file mode 100644
index 9ded3ec..0000000
--- a/align/GenerateExampleTxRxIQ.ipynb
+++ /dev/null
@@ -1,255 +0,0 @@
-{
- "cells": [
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "# Generate an example RX and TX dataset, with a subsample delay and try to resolve it afterwards"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Configuration"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false
- },
- "outputs": [],
- "source": [
- "# whether to correct for delays larger than one sample\n",
- "# Not necessary unless you have delay larger than oversample/2\n",
- "do_integer_compensation = 0\n",
- "\n",
- "# by how much to oversample the signal before applying the delay\n",
- "oversample = 16\n",
- "\n",
- "# Add a delay of delay/oversample samples to the input signal\n",
- "delay = 7\n",
- "\n",
- "print(\"Expecting a delay of {} samples\".format(delay/oversample))"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Generate signal"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false
- },
- "outputs": [],
- "source": [
- "%matplotlib notebook\n",
- "\n",
- "import matplotlib.pyplot as plt\n",
- "import scipy.signal\n",
- "import numpy as np\n",
- "\n",
- "\n",
- "iq_file = \"/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq\"\n",
- "\n",
- "iq_data = np.fromfile(iq_file, np.complex64)\n",
- "\n",
- "# oversampling the input signal doesn't make much of a difference\n",
- "phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data))\n",
- "\n",
- "# make the signal periodic by duplicating the signal\n",
- "phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq))\n",
- "\n",
- "noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02,\n",
- " size=len(phase_ref_iq))\n",
- "\n",
- "phase_ref_iq = phase_ref_iq + noise_iq\n",
- "\n",
- "# exp(-2i pi f) is the Fourier transform of a unity delay.\n",
- "# exp(2i pi f) is a negative delay.\n",
- "bin_frequencies = np.concatenate(\n",
- " (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False),\n",
- " np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False)))\n",
- "\n",
- "phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq))\n",
- "\n",
- "\n",
- "phase_ref_uc_delayed = np.roll(phase_ref_uc, delay)\n",
- "\n",
- "phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq))"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Part 1: integer delay"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false,
- "scrolled": false
- },
- "outputs": [],
- "source": [
- "corr_begin_ix = -32\n",
- "corr_end_ix = 32\n",
- "\n",
- "corr = [np.abs(np.corrcoef(phase_ref_delayed, np.roll(phase_ref_iq, i))[0,1]) \n",
- " for i in range(corr_begin_ix, corr_end_ix)]\n",
- "# TODO check for negative real correlation peak\n",
- "if do_integer_compensation:\n",
- " delay_in = np.argmax(corr) + corr_begin_ix\n",
- "else:\n",
- " delay_in = 0\n",
- "\n",
- "phase_ref_int_delay_removed = np.roll(phase_ref_delayed, -delay_in)\n",
- "\n",
- "print(\"Integer delay corrected: {}\".format(delay_in))"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "## Part 2: factional delay"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "### Calculate fractional delay"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false,
- "scrolled": false
- },
- "outputs": [],
- "source": [
- "signal_fft = np.fft.fft(phase_ref_int_delay_removed)\n",
- "reference_fft = np.fft.fft(phase_ref_iq)\n",
- "\n",
- "# rotate each bin backwards with the phase of the reference. As we have already resolved the\n",
- "# integer delay, we should find at most one 2*pi wrapping.\n",
- "u = signal_fft * np.conj(reference_fft)\n",
- "\n",
- "\n",
- "\n",
- "# the phase signal will still wrap around, and will have values between -pi/4 and pi/4\n",
- "phase_wrapping = np.angle(u)\n",
- "\n",
- "unwrap_with_deriv_integrate = True\n",
- "if unwrap_with_deriv_integrate:\n",
- " # to unwrap, take the derivative, remove peaks, integrate\n",
- " phase_deriv = phase_wrapping - np.roll(phase_wrapping, 1)\n",
- "\n",
- " def filter_phase_deriv(p):\n",
- " if np.abs(p) < 0.5:\n",
- " return p\n",
- " else:\n",
- " return 0\n",
- " \n",
- " phase_deriv_nopeaks = [filter_phase_deriv(p) for p in phase_deriv]\n",
- " phase_unwrapped = np.cumsum(phase_deriv_nopeaks)\n",
- "else:\n",
- " # doesn't always work, sometimes there are smaller jumps in phase\n",
- " phase_unwrapped = np.mod(phase_wrapping + np.pi/2, np.ones(len(phase_wrapping)) * np.pi / 2)\n",
- "\n",
- "\n",
- "# Find the slope using a linear regression\n",
- "slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(phase_unwrapped,range(len(phase_unwrapped)))\n",
- "\n",
- "if p_value < 0.05:\n",
- " frac_delay = slope / len(phase_unwrapped)\n",
- " frac_rotate = intercept / len(phase_unwrapped)\n",
- " print(\"Applying subsample correction: {} {}\".format(frac_delay, frac_rotate))\n",
- " print(slope, intercept, r_value, p_value, std_err) \n",
- "else:\n",
- " print(\"Skipping subsample correction\")\n",
- " print(slope, intercept, r_value, p_value, std_err)\n",
- " frac_delay = None\n",
- "\n",
- "plt.figure()\n",
- "plt.plot(phase_wrapping)\n",
- "plt.plot(phase_unwrapped)\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false,
- "scrolled": false
- },
- "outputs": [],
- "source": [
- "if frac_delay:\n",
- " fine_shift_fft = np.exp((0+2j * np.pi * frac_delay) * bin_frequencies) * np.exp(0+2j * np.pi * frac_rotate)\n",
- " sig_delay_removed_fft = signal_fft * fine_shift_fft\n",
- " \n",
- " sig_delay_removed = np.fft.ifft(sig_delay_removed_fft)\n",
- " \n",
- " plt.figure()\n",
- " plt.plot(np.angle(np.fft.fftshift(fine_shift_fft)))\n",
- " \n",
- " plt.figure()\n",
- " plt.plot(np.abs(np.fft.fftshift(np.fft.fft(phase_ref_iq))))\n",
- " plt.plot(np.abs(np.fft.fftshift(np.fft.fft(sig_delay_removed-phase_ref_iq))))\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false
- },
- "outputs": [],
- "source": []
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": true
- },
- "outputs": [],
- "source": []
- }
- ],
- "metadata": {
- "kernelspec": {
- "display_name": "Python 3",
- "language": "python",
- "name": "python3"
- },
- "language_info": {
- "codemirror_mode": {
- "name": "ipython",
- "version": 3
- },
- "file_extension": ".py",
- "mimetype": "text/x-python",
- "name": "python",
- "nbconvert_exporter": "python",
- "pygments_lexer": "ipython3",
- "version": "3.6.0"
- }
- },
- "nbformat": 4,
- "nbformat_minor": 2
-}
diff --git a/align/GenerateExampleTxRxIQ.py b/align/GenerateExampleTxRxIQ.py
new file mode 100755
index 0000000..dc8b572
--- /dev/null
+++ b/align/GenerateExampleTxRxIQ.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env python
+#
+# Generate an example RX and TX dataset, with a subsample delay and try to resolve it afterwards
+#
+#
+# The MIT License (MIT)
+#
+# Copyright (c) 2017 Matthias P. Braendli
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+import matplotlib.pyplot as plt
+import scipy.signal
+import numpy as np
+
+## Configuration
+# whether to correct for delays larger than one sample
+# Not necessary unless you have delay larger than oversample/2
+do_integer_compensation = 0
+
+# by how much to oversample the signal before applying the delay
+oversample = 16
+
+# Add a delay of delay/oversample samples to the input signal
+delay = 7
+
+print("Expecting a delay of {} samples".format(delay/oversample))
+
+## Generate signal
+
+iq_file = "/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq"
+
+iq_data = np.fromfile(iq_file, np.complex64)
+
+# oversampling the input signal doesn't make much of a difference
+phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data))
+
+# make the signal periodic by duplicating the signal
+phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq))
+
+noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02,
+ size=len(phase_ref_iq))
+
+phase_ref_iq = phase_ref_iq + noise_iq
+
+# exp(-2i pi f) is the Fourier transform of a unity delay.
+# exp(2i pi f) is a negative delay.
+bin_frequencies = np.concatenate(
+ (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False),
+ np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False)))
+
+phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq))
+
+
+phase_ref_uc_delayed = np.roll(phase_ref_uc, delay)
+
+phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq))
+
+## Integer delay
+corr_begin_ix = -32
+corr_end_ix = 32
+
+corr = [np.abs(np.corrcoef(phase_ref_delayed, np.roll(phase_ref_iq, i))[0,1])
+ for i in range(corr_begin_ix, corr_end_ix)]
+# TODO check for negative real correlation peak
+if do_integer_compensation:
+ delay_in = np.argmax(corr) + corr_begin_ix
+else:
+ delay_in = 0
+
+phase_ref_int_delay_removed = np.roll(phase_ref_delayed, -delay_in)
+
+print("Integer delay corrected: {}".format(delay_in))
+
+## Fractional delay
+signal_fft = np.fft.fft(phase_ref_int_delay_removed)
+reference_fft = np.fft.fft(phase_ref_iq)
+
+# rotate each bin backwards with the phase of the reference. As we have already resolved the
+# integer delay, we should find at most one 2*pi wrapping.
+u = signal_fft * np.conj(reference_fft)
+
+
+# the phase signal will still wrap around, and will have values between -pi/4 and pi/4
+phase_wrapping = np.angle(u)
+
+unwrap_with_deriv_integrate = True
+if unwrap_with_deriv_integrate:
+ # to unwrap, take the derivative, remove peaks, integrate
+ phase_deriv = phase_wrapping - np.roll(phase_wrapping, 1)
+
+ def filter_phase_deriv(p):
+ if np.abs(p) < 0.5:
+ return p
+ else:
+ return 0
+
+ phase_deriv_nopeaks = [filter_phase_deriv(p) for p in phase_deriv]
+ phase_unwrapped = np.cumsum(phase_deriv_nopeaks)
+else:
+ # doesn't always work, sometimes there are smaller jumps in phase
+ phase_unwrapped = np.mod(phase_wrapping + np.pi/2, np.ones(len(phase_wrapping)) * np.pi / 2)
+
+
+# Find the slope using a linear regression
+slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(phase_unwrapped,range(len(phase_unwrapped)))
+
+if p_value < 0.05:
+ frac_delay = slope / len(phase_unwrapped)
+ frac_rotate = intercept / len(phase_unwrapped)
+ print("Applying subsample correction: {} {}".format(frac_delay, frac_rotate))
+ print(slope, intercept, r_value, p_value, std_err)
+else:
+ print("Skipping subsample correction")
+ print(slope, intercept, r_value, p_value, std_err)
+ frac_delay = None
+
+plt.figure()
+plt.plot(phase_wrapping)
+plt.plot(phase_unwrapped)
+
+
+if frac_delay:
+ fine_shift_fft = np.exp((0+2j * np.pi * frac_delay) * bin_frequencies) * np.exp(0+2j * np.pi * frac_rotate)
+ sig_delay_removed_fft = signal_fft * fine_shift_fft
+
+ sig_delay_removed = np.fft.ifft(sig_delay_removed_fft)
+
+ plt.figure()
+ plt.plot(np.angle(np.fft.fftshift(fine_shift_fft)))
+
+ plt.figure()
+ plt.plot(np.abs(np.fft.fftshift(np.fft.fft(phase_ref_iq))))
+ plt.plot(np.abs(np.fft.fftshift(np.fft.fft(sig_delay_removed-phase_ref_iq))))
+
+plt.show()