diff options
| -rw-r--r-- | dpd/src/Symbol_align.py | 191 | 
1 files changed, 191 insertions, 0 deletions
diff --git a/dpd/src/Symbol_align.py b/dpd/src/Symbol_align.py new file mode 100644 index 0000000..05a9049 --- /dev/null +++ b/dpd/src/Symbol_align.py @@ -0,0 +1,191 @@ +# -*- coding: utf-8 -*- +# +# Modulation Error Rate +# +# http://www.opendigitalradio.org +# Licence: The MIT License, see notice at the end of this file + +import datetime +import os +import logging +import time +try: +    logging_path = os.path.dirname(logging.getLoggerClass().root.handlers[0].baseFilename) +except: +    logging_path = "/tmp/" + +import numpy as np +import src.const +import scipy +import matplotlib +matplotlib.use('agg') +import matplotlib.pyplot as plt + +class Symbol_align: +    """ +    Find the phase offset to the start of the DAB symbols in an +    unaligned dab signal. +    """ +    def __init__(self, sample_rate): +        self.c = src.const.const(sample_rate) +        pass + +    def _calc_offset_to_first_symbol_without_prefix(self, tx, debug=False): +        tx_orig = tx[0:-self.c.T_U] +        tx_cut_prefix = tx[self.c.T_U:] + +        tx_product = np.abs(tx_orig - tx_cut_prefix) +        tx_product_avg = np.correlate( +            tx_product, +            np.ones(self.c.T_C), +            mode='valid') +        tx_product_avg_min_filt = \ +            scipy.ndimage.filters.minimum_filter1d( +                tx_product_avg, +                int(1.5 * self.c.T_S) +            ) +        peaks = np.ravel(np.where(tx_product_avg == tx_product_avg_min_filt)) + +        offset = peaks[np.argmin([tx_product_avg[peak] for peak in peaks])] + +        if debug: +            fig = plt.figure(figsize=(9, 9)) + +            ax = fig.add_subplot(4, 1, 1) +            ax.plot(tx_product) +            ylim = ax.get_ylim() +            for peak in peaks: +                ax.plot((peak, peak), (ylim[0], ylim[1])) +                if peak == offset: +                    ax.text(peak, ylim[0] + 0.3 * np.diff(ylim), "offset", rotation=90) +                else: +                    ax.text(peak, ylim[0] + 0.2 * np.diff(ylim), "peak", rotation=90) +            ax.set_xlabel("Sample") +            ax.set_ylabel("Conj. Product") +            ax.set_title("Difference with shifted self") + +            ax = fig.add_subplot(4, 1, 2) +            ax.plot(tx_product_avg) +            ylim = ax.get_ylim() +            for peak in peaks: +                ax.plot((peak, peak), (ylim[0], ylim[1])) +                if peak == offset: +                    ax.text(peak, ylim[0] + 0.3 * np.diff(ylim), "offset", rotation=90) +                else: +                    ax.text(peak, ylim[0] + 0.2 * np.diff(ylim), "peak", rotation=90) +            ax.set_xlabel("Sample") +            ax.set_ylabel("Conj. Product") +            ax.set_title("Moving Average") + +            ax = fig.add_subplot(4, 1, 3) +            ax.plot(tx_product_avg_min_filt) +            ylim = ax.get_ylim() +            for peak in peaks: +                ax.plot((peak, peak), (ylim[0], ylim[1])) +                if peak == offset: +                    ax.text(peak, ylim[0] + 0.3 * np.diff(ylim), "offset", rotation=90) +                else: +                    ax.text(peak, ylim[0] + 0.2 * np.diff(ylim), "peak", rotation=90) +            ax.set_xlabel("Sample") +            ax.set_ylabel("Conj. Product") +            ax.set_title("Min Filter") + +            ax = fig.add_subplot(4, 1, 4) +            tx_product_crop = tx_product[peaks[0]-50:peaks[0]+50] +            x = range(tx_product.shape[0])[peaks[0]-50:peaks[0]+50] +            ax.plot(x, tx_product_crop) +            ylim = ax.get_ylim() +            ax.plot((peaks[0], peaks[0]), (ylim[0], ylim[1])) +            ax.set_xlabel("Sample") +            ax.set_ylabel("Conj. Product") +            ax.set_title("Difference with shifted self") +            fig.tight_layout() + +        # "offset" measures where the shifted signal matches the +        # original signal. Therefore we have to subtract the size +        # of the shift to find the offset of the symbol start. +        return (offset + self.c.T_C) % self.c.T_S + +    def _remove_outliers(self, x, stds=5): +        deviation_from_mean = np.abs(x - np.mean(x)) +        inlier_idxs = deviation_from_mean < stds * np.std(x) +        x = x[inlier_idxs] +        return x + +    def _calc_delta_angle(self, fft, debug=False): +        # Introduce invariance against carrier +        angles = np.angle(fft) % (np.pi / 2.) + +        # Calculate Angle difference and compensate jumps +        deltas_angle = np.diff(angles) +        deltas_angle[deltas_angle > np.pi/4.] =\ +            deltas_angle[deltas_angle > np.pi/4.] - np.pi/2. +        deltas_angle[-deltas_angle > np.pi/4.] = \ +            deltas_angle[-deltas_angle > np.pi/4.] + np.pi/2. +        deltas_angle = self._remove_outliers(deltas_angle) + +        delta_angle = np.mean(deltas_angle) +        delta_angle_std = np.std(deltas_angle) +        if debug: +            plt.subplot(211) +            plt.plot(angles, 'p') +            plt.subplot(212) +            plt.plot(deltas_angle, 'p') +        return delta_angle + +    def _delta_angle_to_samples(self, angle): +        return - angle / self.c.phase_offset_per_sample + +    def _calc_sample_offset(self, sig, debug=False): +        assert sig.shape[0] == self.c.T_U,\ +            "Input length is not a Symbol without cyclic prefix" + +        fft = np.fft.fftshift(np.fft.fft(sig)) +        fft_crop = np.delete(fft[self.c.FFT_start:self.c.FFT_end], self.c.FFT_delete) +        delta_angle = self._calc_delta_angle(fft_crop, debug=debug) +        delta_sample = self._delta_angle_to_samples(delta_angle) +        delta_sample_int = np.round(delta_sample).astype(int) +        error = np.abs(delta_sample_int - delta_sample) +        if error > 0.1: +            raise RuntimeError("Could not calculate sample offset") +        return delta_sample_int + +    def calc_offset(self, tx): +        off_sym = self._calc_offset_to_first_symbol_without_prefix( +            tx, debug=False) +        off_sam = self._calc_sample_offset( +            tx[off_sym:off_sym + self.c.T_U]) +        off = (off_sym + off_sam) % self.c.T_S + +        assert self._calc_sample_offset(tx[off:off + self.c.T_U]) == 0, \ +            "Failed to calculate offset" +        return off + +    def crop_symbol_without_cyclic_prefix(self, tx): +        off = self.calc_offset(tx) +        return tx[ +               off: +               off+self.c.T_U +               ] + +# The MIT License (MIT) +# +# Copyright (c) 2017 Andreas Steger +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE.  | 
