diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-02-04 17:59:20 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-02-04 17:59:20 +0100 |
commit | 936db3485eabc692e29f4249349fc351aad18675 (patch) | |
tree | 05583c1846b706bf3867cdf38a8cc5a5fd1f121c | |
parent | ce8ae723cf496b698a0f6395e43bc3b993081d5b (diff) | |
download | odr-dpd-936db3485eabc692e29f4249349fc351aad18675.tar.gz odr-dpd-936db3485eabc692e29f4249349fc351aad18675.tar.bz2 odr-dpd-936db3485eabc692e29f4249349fc351aad18675.zip |
Replace jupyter notebook by script file
-rw-r--r-- | align/GenerateExampleTxRxIQ.ipynb | 255 | ||||
-rwxr-xr-x | align/GenerateExampleTxRxIQ.py | 152 |
2 files changed, 152 insertions, 255 deletions
diff --git a/align/GenerateExampleTxRxIQ.ipynb b/align/GenerateExampleTxRxIQ.ipynb deleted file mode 100644 index 9ded3ec..0000000 --- a/align/GenerateExampleTxRxIQ.ipynb +++ /dev/null @@ -1,255 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Generate an example RX and TX dataset, with a subsample delay and try to resolve it afterwards" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Configuration" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false - }, - "outputs": [], - "source": [ - "# whether to correct for delays larger than one sample\n", - "# Not necessary unless you have delay larger than oversample/2\n", - "do_integer_compensation = 0\n", - "\n", - "# by how much to oversample the signal before applying the delay\n", - "oversample = 16\n", - "\n", - "# Add a delay of delay/oversample samples to the input signal\n", - "delay = 7\n", - "\n", - "print(\"Expecting a delay of {} samples\".format(delay/oversample))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Generate signal" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false - }, - "outputs": [], - "source": [ - "%matplotlib notebook\n", - "\n", - "import matplotlib.pyplot as plt\n", - "import scipy.signal\n", - "import numpy as np\n", - "\n", - "\n", - "iq_file = \"/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq\"\n", - "\n", - "iq_data = np.fromfile(iq_file, np.complex64)\n", - "\n", - "# oversampling the input signal doesn't make much of a difference\n", - "phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data))\n", - "\n", - "# make the signal periodic by duplicating the signal\n", - "phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq))\n", - "\n", - "noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02,\n", - " size=len(phase_ref_iq))\n", - "\n", - "phase_ref_iq = phase_ref_iq + noise_iq\n", - "\n", - "# exp(-2i pi f) is the Fourier transform of a unity delay.\n", - "# exp(2i pi f) is a negative delay.\n", - "bin_frequencies = np.concatenate(\n", - " (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False),\n", - " np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False)))\n", - "\n", - "phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq))\n", - "\n", - "\n", - "phase_ref_uc_delayed = np.roll(phase_ref_uc, delay)\n", - "\n", - "phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Part 1: integer delay" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false, - "scrolled": false - }, - "outputs": [], - "source": [ - "corr_begin_ix = -32\n", - "corr_end_ix = 32\n", - "\n", - "corr = [np.abs(np.corrcoef(phase_ref_delayed, np.roll(phase_ref_iq, i))[0,1]) \n", - " for i in range(corr_begin_ix, corr_end_ix)]\n", - "# TODO check for negative real correlation peak\n", - "if do_integer_compensation:\n", - " delay_in = np.argmax(corr) + corr_begin_ix\n", - "else:\n", - " delay_in = 0\n", - "\n", - "phase_ref_int_delay_removed = np.roll(phase_ref_delayed, -delay_in)\n", - "\n", - "print(\"Integer delay corrected: {}\".format(delay_in))" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Part 2: factional delay" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Calculate fractional delay" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false, - "scrolled": false - }, - "outputs": [], - "source": [ - "signal_fft = np.fft.fft(phase_ref_int_delay_removed)\n", - "reference_fft = np.fft.fft(phase_ref_iq)\n", - "\n", - "# rotate each bin backwards with the phase of the reference. As we have already resolved the\n", - "# integer delay, we should find at most one 2*pi wrapping.\n", - "u = signal_fft * np.conj(reference_fft)\n", - "\n", - "\n", - "\n", - "# the phase signal will still wrap around, and will have values between -pi/4 and pi/4\n", - "phase_wrapping = np.angle(u)\n", - "\n", - "unwrap_with_deriv_integrate = True\n", - "if unwrap_with_deriv_integrate:\n", - " # to unwrap, take the derivative, remove peaks, integrate\n", - " phase_deriv = phase_wrapping - np.roll(phase_wrapping, 1)\n", - "\n", - " def filter_phase_deriv(p):\n", - " if np.abs(p) < 0.5:\n", - " return p\n", - " else:\n", - " return 0\n", - " \n", - " phase_deriv_nopeaks = [filter_phase_deriv(p) for p in phase_deriv]\n", - " phase_unwrapped = np.cumsum(phase_deriv_nopeaks)\n", - "else:\n", - " # doesn't always work, sometimes there are smaller jumps in phase\n", - " phase_unwrapped = np.mod(phase_wrapping + np.pi/2, np.ones(len(phase_wrapping)) * np.pi / 2)\n", - "\n", - "\n", - "# Find the slope using a linear regression\n", - "slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(phase_unwrapped,range(len(phase_unwrapped)))\n", - "\n", - "if p_value < 0.05:\n", - " frac_delay = slope / len(phase_unwrapped)\n", - " frac_rotate = intercept / len(phase_unwrapped)\n", - " print(\"Applying subsample correction: {} {}\".format(frac_delay, frac_rotate))\n", - " print(slope, intercept, r_value, p_value, std_err) \n", - "else:\n", - " print(\"Skipping subsample correction\")\n", - " print(slope, intercept, r_value, p_value, std_err)\n", - " frac_delay = None\n", - "\n", - "plt.figure()\n", - "plt.plot(phase_wrapping)\n", - "plt.plot(phase_unwrapped)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false, - "scrolled": false - }, - "outputs": [], - "source": [ - "if frac_delay:\n", - " fine_shift_fft = np.exp((0+2j * np.pi * frac_delay) * bin_frequencies) * np.exp(0+2j * np.pi * frac_rotate)\n", - " sig_delay_removed_fft = signal_fft * fine_shift_fft\n", - " \n", - " sig_delay_removed = np.fft.ifft(sig_delay_removed_fft)\n", - " \n", - " plt.figure()\n", - " plt.plot(np.angle(np.fft.fftshift(fine_shift_fft)))\n", - " \n", - " plt.figure()\n", - " plt.plot(np.abs(np.fft.fftshift(np.fft.fft(phase_ref_iq))))\n", - " plt.plot(np.abs(np.fft.fftshift(np.fft.fft(sig_delay_removed-phase_ref_iq))))\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": false - }, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.0" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/align/GenerateExampleTxRxIQ.py b/align/GenerateExampleTxRxIQ.py new file mode 100755 index 0000000..dc8b572 --- /dev/null +++ b/align/GenerateExampleTxRxIQ.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python +# +# Generate an example RX and TX dataset, with a subsample delay and try to resolve it afterwards +# +# +# The MIT License (MIT) +# +# Copyright (c) 2017 Matthias P. Braendli +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import matplotlib.pyplot as plt +import scipy.signal +import numpy as np + +## Configuration +# whether to correct for delays larger than one sample +# Not necessary unless you have delay larger than oversample/2 +do_integer_compensation = 0 + +# by how much to oversample the signal before applying the delay +oversample = 16 + +# Add a delay of delay/oversample samples to the input signal +delay = 7 + +print("Expecting a delay of {} samples".format(delay/oversample)) + +## Generate signal + +iq_file = "/home/bram/dab/aux/odr-dab-cir/phasereference.2048000.fc64.iq" + +iq_data = np.fromfile(iq_file, np.complex64) + +# oversampling the input signal doesn't make much of a difference +phase_ref_iq = scipy.signal.resample(iq_data, 2 * len(iq_data)) + +# make the signal periodic by duplicating the signal +phase_ref_iq = np.concatenate((phase_ref_iq, phase_ref_iq)) + +noise_iq = np.random.normal(scale = np.max(np.abs(phase_ref_iq)) * 0.02, + size=len(phase_ref_iq)) + +phase_ref_iq = phase_ref_iq + noise_iq + +# exp(-2i pi f) is the Fourier transform of a unity delay. +# exp(2i pi f) is a negative delay. +bin_frequencies = np.concatenate( + (np.linspace(0, 0.5, len(phase_ref_iq)/2, endpoint=False), + np.linspace(-0.5, 0, len(phase_ref_iq)/2, endpoint=False))) + +phase_ref_uc = scipy.signal.resample(phase_ref_iq, oversample * len(phase_ref_iq)) + + +phase_ref_uc_delayed = np.roll(phase_ref_uc, delay) + +phase_ref_delayed = scipy.signal.resample(phase_ref_uc_delayed, len(phase_ref_iq)) + +## Integer delay +corr_begin_ix = -32 +corr_end_ix = 32 + +corr = [np.abs(np.corrcoef(phase_ref_delayed, np.roll(phase_ref_iq, i))[0,1]) + for i in range(corr_begin_ix, corr_end_ix)] +# TODO check for negative real correlation peak +if do_integer_compensation: + delay_in = np.argmax(corr) + corr_begin_ix +else: + delay_in = 0 + +phase_ref_int_delay_removed = np.roll(phase_ref_delayed, -delay_in) + +print("Integer delay corrected: {}".format(delay_in)) + +## Fractional delay +signal_fft = np.fft.fft(phase_ref_int_delay_removed) +reference_fft = np.fft.fft(phase_ref_iq) + +# rotate each bin backwards with the phase of the reference. As we have already resolved the +# integer delay, we should find at most one 2*pi wrapping. +u = signal_fft * np.conj(reference_fft) + + +# the phase signal will still wrap around, and will have values between -pi/4 and pi/4 +phase_wrapping = np.angle(u) + +unwrap_with_deriv_integrate = True +if unwrap_with_deriv_integrate: + # to unwrap, take the derivative, remove peaks, integrate + phase_deriv = phase_wrapping - np.roll(phase_wrapping, 1) + + def filter_phase_deriv(p): + if np.abs(p) < 0.5: + return p + else: + return 0 + + phase_deriv_nopeaks = [filter_phase_deriv(p) for p in phase_deriv] + phase_unwrapped = np.cumsum(phase_deriv_nopeaks) +else: + # doesn't always work, sometimes there are smaller jumps in phase + phase_unwrapped = np.mod(phase_wrapping + np.pi/2, np.ones(len(phase_wrapping)) * np.pi / 2) + + +# Find the slope using a linear regression +slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(phase_unwrapped,range(len(phase_unwrapped))) + +if p_value < 0.05: + frac_delay = slope / len(phase_unwrapped) + frac_rotate = intercept / len(phase_unwrapped) + print("Applying subsample correction: {} {}".format(frac_delay, frac_rotate)) + print(slope, intercept, r_value, p_value, std_err) +else: + print("Skipping subsample correction") + print(slope, intercept, r_value, p_value, std_err) + frac_delay = None + +plt.figure() +plt.plot(phase_wrapping) +plt.plot(phase_unwrapped) + + +if frac_delay: + fine_shift_fft = np.exp((0+2j * np.pi * frac_delay) * bin_frequencies) * np.exp(0+2j * np.pi * frac_rotate) + sig_delay_removed_fft = signal_fft * fine_shift_fft + + sig_delay_removed = np.fft.ifft(sig_delay_removed_fft) + + plt.figure() + plt.plot(np.angle(np.fft.fftshift(fine_shift_fft))) + + plt.figure() + plt.plot(np.abs(np.fft.fftshift(np.fft.fft(phase_ref_iq)))) + plt.plot(np.abs(np.fft.fftshift(np.fft.fft(sig_delay_removed-phase_ref_iq)))) + +plt.show() |