diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-09-13 18:55:39 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-09-13 18:55:39 +0200 |
commit | 1ca5368f547c429bf0d86dac78162310e1d2b032 (patch) | |
tree | 6c6949b3ecf235ab18a6d21af6d3ad6105190d67 /dpd | |
parent | 4f9372c130960559a0bba13828a810eb57e30123 (diff) | |
download | dabmod-1ca5368f547c429bf0d86dac78162310e1d2b032.tar.gz dabmod-1ca5368f547c429bf0d86dac78162310e1d2b032.tar.bz2 dabmod-1ca5368f547c429bf0d86dac78162310e1d2b032.zip |
Add LUT predistorter
Diffstat (limited to 'dpd')
-rw-r--r-- | dpd/README.md | 23 | ||||
-rw-r--r-- | dpd/dpd.ini | 6 | ||||
-rw-r--r-- | dpd/lut.coef | 64 | ||||
-rwxr-xr-x | dpd/main.py | 51 | ||||
-rw-r--r-- | dpd/poly.coef | 3 | ||||
-rw-r--r-- | dpd/src/Adapt.py | 89 | ||||
-rw-r--r-- | dpd/src/Model.py | 259 |
7 files changed, 328 insertions, 167 deletions
diff --git a/dpd/README.md b/dpd/README.md index b5a6b81..83a2986 100644 --- a/dpd/README.md +++ b/dpd/README.md @@ -8,7 +8,9 @@ Concept ODR-DabMod makes outgoing TX samples and feedback RX samples available for an external tool. This external tool can request a buffer of samples for analysis, can calculate coefficients for the -polynomial predistorter in ODR-DabMod and load the new coefficients using the remote control. +predistorter in ODR-DabMod and load the new coefficients using the remote control. + +The predistorter in ODR-DabMod supports two modes: polynomial and lookup table. The *dpd/main.py* script is the entry point for the *DPD Calculation Engine* into which these features will be implemented. The tool uses modules from the *dpd/src/* folder: @@ -43,10 +45,21 @@ The coef file contains the polynomial coefficients used in the predistorter. The very similar to the filtertaps file used in the FIR filter. It is a text-based format that can easily be inspected and edited in a text editor. -The first line contains the number of coefficients as an integer. The second and third lines contain -the real, respectively the imaginary parts of the first coefficient. Fourth and fifth lines give the -second coefficient, and so on. The file therefore contains 2xN + 1 lines if it contains N -coefficients. +The first line contains an integer that defines the predistorter to be used: +1 for polynomial, 2 for lookup table. + +For the polynomial, the subsequent line contains the number of coefficients +as an integer. The second and third lines contain the real, respectively the +imaginary parts of the first coefficient. Fourth and fifth lines give the +second coefficient, and so on. The file therefore contains 1 + 1 + 2xN lines if +it contains N coefficients. + +For the lookup table, the subsequent line contains a float scalefactor that is +applied to the samples in order to bring them into the range of 32-bit unsigned +integer. Then, the next pair of lines contains real and imaginary part of the first +lookup-table entry, which is multiplied to samples in first range. Then it's +followed by 31 other pairs. The entries are complex values close to 1 + 0j. +The file therefore contains 1 + 1 + 2xN lines if it contains N coefficients. TODO ---- diff --git a/dpd/dpd.ini b/dpd/dpd.ini index 1bc51de..9a76393 100644 --- a/dpd/dpd.ini +++ b/dpd/dpd.ini @@ -10,8 +10,8 @@ filelog=1 filename=/tmp/dabmod.log [input] -transport=tcp -source=localhost:9200 +transport=file +source=/home/bram/dab/mmbtools-aux/eti/buddard.eti [modulator] gainmode=var @@ -31,7 +31,7 @@ polycoeffile=dpd/poly.coef [output] # to prepare a file for the dpd/iq_file_server.py script, # use output=file -output=uhd +output=file [fileoutput] filename=dpd.iq diff --git a/dpd/lut.coef b/dpd/lut.coef new file mode 100644 index 0000000..a198d56 --- /dev/null +++ b/dpd/lut.coef @@ -0,0 +1,64 @@ +2 +4294967296 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 diff --git a/dpd/main.py b/dpd/main.py index 5e67c90..de3453e 100755 --- a/dpd/main.py +++ b/dpd/main.py @@ -13,7 +13,7 @@ predistortion module of ODR-DabMod.""" import datetime import os import time - +import sys import matplotlib matplotlib.use('GTKAgg') @@ -82,8 +82,8 @@ parser.add_argument('--samps', default='81920', parser.add_argument('-i', '--iterations', default='1', help='Number of iterations to run', required=False) -parser.add_argument('-l', '--load-poly', - help='Load existing polynomial', +parser.add_argument('-L', '--lut', + help='Use lookup table instead of polynomial predistorter', action="store_true") cli_args = parser.parse_args() @@ -105,12 +105,13 @@ c = src.const.const(samplerate) meas = Measure.Measure(samplerate, port, num_req) adapt = Adapt.Adapt(port_rc, coef_path) -coefs_am, coefs_pm = adapt.get_coefs() -if cli_args.load_poly: - model = Model.Model(c, SA, MER, coefs_am, coefs_pm, plot=True) +dpddata = adapt.get_predistorter() + +if cli_args.lut: + model = Model.LutModel(c, SA, MER, plot=True) else: - model = Model.Model(c, SA, MER, [1.0, 0, 0, 0, 0], [0, 0, 0, 0, 0], plot=True) -adapt.set_coefs(model.coefs_am, model.coefs_pm) + model = Model.PolyModel(c, SA, MER, None, None, plot=True) +adapt.set_predistorter(model.get_dpd_data()) adapt.set_digital_gain(digital_gain) adapt.set_txgain(txgain) adapt.set_rxgain(rxgain) @@ -118,13 +119,28 @@ adapt.set_rxgain(rxgain) tx_gain = adapt.get_txgain() rx_gain = adapt.get_rxgain() digital_gain = adapt.get_digital_gain() -dpd_coefs_am, dpd_coefs_pm = adapt.get_coefs() -logging.info( - "TX gain {}, RX gain {}, dpd_coefs_am {}," - " dpd_coefs_pm {}, digital_gain {}".format( - tx_gain, rx_gain, dpd_coefs_am, dpd_coefs_pm, digital_gain + +dpddata = adapt.get_coefs() +if dpddata[0] == "poly": + coefs_am = dpddata[1] + coefs_pm = dpddata[2] + logging.info( + "TX gain {}, RX gain {}, dpd_coefs_am {}," + " dpd_coefs_pm {}, digital_gain {}".format( + tx_gain, rx_gain, coefs_am, coefs_pm, digital_gain + ) ) -) +elif dpddata[0] == "lut": + scalefactor = dpddata[1] + lut = dpddata[2] + logging.info( + "TX gain {}, RX gain {}, LUT scalefactor {}," + " LUT {}, digital_gain {}".format( + tx_gain, rx_gain, scalefactor, lut, digital_gain + ) + ) +else: + logging.error("Unknown dpd data format {}".format(dpddata[0])) tx_agc = TX_Agc.TX_Agc(adapt) @@ -141,8 +157,8 @@ for i in range(num_iter): if tx_agc.adapt_if_necessary(txframe_aligned): continue - coefs_am, coefs_pm = model.get_next_coefs(txframe_aligned, rxframe_aligned) - adapt.set_coefs(coefs_am, coefs_pm) + model.train(txframe_aligned, rxframe_aligned) + adapt.set_predistorter(model.get_dpd_data()) off = SA.calc_offset(txframe_aligned) tx_mer = MER.calc_mer(txframe_aligned[off:off + c.T_U]) @@ -155,7 +171,8 @@ for i in range(num_iter): # The MIT License (MIT) # -# Copyright (c) 2017 Andreas Steger, Matthias P. Braendli +# Copyright (c) 2017 Andreas Steger +# Copyright (c) 2017 Matthias P. Braendli # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal diff --git a/dpd/poly.coef b/dpd/poly.coef index f65e369..913507a 100644 --- a/dpd/poly.coef +++ b/dpd/poly.coef @@ -1,5 +1,6 @@ -5 1 +5 +1.0 0 0 0 diff --git a/dpd/src/Adapt.py b/dpd/src/Adapt.py index b4042d6..f21bb87 100644 --- a/dpd/src/Adapt.py +++ b/dpd/src/Adapt.py @@ -13,6 +13,10 @@ import zmq import logging import numpy as np +LUT_LEN=32 +FORMAT_POLY=1 +FORMAT_LUT=2 + class Adapt: """Uses the ZMQ remote control to change parameters of the DabMod @@ -126,45 +130,74 @@ class Adapt: # TODO handle failure return float(self.send_receive("get gain digital")[0]) - def _read_coef_file(self, path): + def get_predistorter(self): """Load the coefficients from the file in the format given in the README, - return ([AM coef], [PM coef])""" - coefs_am_out = [] - coefs_pm_out = [] - f = open(path, 'r') + return ("poly", [AM coef], [PM coef]) or ("lut", scalefactor, [LUT entries])""" + f = open(self.coef_path, 'r') lines = f.readlines() - n_coefs = int(lines[0]) - coefs = [float(l) for l in lines[1:]] - i = 0 - for c in coefs: - if i < n_coefs: - coefs_am_out.append(c) - elif i < 2*n_coefs: - coefs_pm_out.append(c) - else: - raise ValueError( - "Incorrect coef file format: too many coefficients in {}, should be {}, coefs are {}" - .format(path, n_coefs, coefs)) - i += 1 - f.close() - return (coefs_am_out, coefs_pm_out) - - def get_coefs(self): - return self._read_coef_file(self.coef_path) - - def _write_coef_file(self, coefs_am, coefs_pm, path): + predistorter_format = int(lines[0]) + if predistorter_format == FORMAT_POLY: + coefs_am_out = [] + coefs_pm_out = [] + n_coefs = int(lines[1]) + coefs = [float(l) for l in lines[2:]] + i = 0 + for c in coefs: + if i < n_coefs: + coefs_am_out.append(c) + elif i < 2*n_coefs: + coefs_pm_out.append(c) + else: + raise ValueError( + "Incorrect coef file format: too many coefficients in {}, should be {}, coefs are {}" + .format(path, n_coefs, coefs)) + i += 1 + f.close() + return ("poly", coefs_am_out, coefs_pm_out) + elif predistorter_format == FORMAT_LUT: + scalefactor = int(lines[1]) + coefs = np.array([float(l) for l in lines[2:]], dtype=np.float32) + coefs = coefs.reshape((-1, 2)) + lut = coefs[..., 0] + 1j * coefs[..., 1] + if len(lut) != LUT_LEN: + raise ValueError("Incorrect number of LUT entries ({} expected {})".format(len(lut), LUT_LEN)) + return ("lut", scalefactor, lut) + else: + raise ValueError("Unknown predistorter format {}".format(predistorter_format)) + + def _write_poly_coef_file(self, coefs_am, coefs_pm, path): assert(len(coefs_am) == len(coefs_pm)) f = open(path, 'w') - f.write("{}\n".format(len(coefs_am))) + f.write("{}\n{}\n".format(FORMAT_POLY, len(coefs_am))) for coef in coefs_am: f.write("{}\n".format(coef)) for coef in coefs_pm: f.write("{}\n".format(coef)) f.close() - def set_coefs(self, coefs_am, coefs_pm): - self._write_coef_file(coefs_am, coefs_pm, self.coef_path) + def _write_lut_file(self, scalefactor, lut, path): + assert(len(lut) == LUT_LEN) + + f = open(path, 'w') + f.write("{}\n{}\n".format(FORMAT_LUT, scalefactor)) + for coef in lut: + f.write("{}\n{}\n".format(coef.real, coef.imag)) + f.close() + + def set_predistorter(self, dpddata): + """Update the predistorter data in the modulator. Takes the same + tuple format as argument than the one returned get_predistorter()""" + if dpddata[0] == "poly": + coefs_am = dpddata[1] + coefs_pm = dpddata[2] + self._write_poly_coef_file(coefs_am, coefs_pm, self.coef_path) + elif dpddata[0] == "lut": + scalefactor = dpddata[1] + lut = dpddata[2] + self._write_lut_file(scalefactor, lut, self.coef_path) + else: + raise ValueError("Unknown predistorter '{}'".format(dpddata[0])) self.send_receive("set memlesspoly coeffile {}".format(self.coef_path)) # The MIT License (MIT) diff --git a/dpd/src/Model.py b/dpd/src/Model.py index 827027a..a23f0ce 100644 --- a/dpd/src/Model.py +++ b/dpd/src/Model.py @@ -15,7 +15,7 @@ import numpy as np import matplotlib.pyplot as plt from sklearn import linear_model -class Model: +class PolyModel: """Calculates new coefficients using the measurement and the old coefficients""" @@ -28,6 +28,7 @@ class Model: learning_rate_am=1., learning_rate_pm=1., plot=False): + logging.debug("Initialising Poly Model") self.c = c self.SA = SA self.MER = MER @@ -35,7 +36,10 @@ class Model: self.learning_rate_am = learning_rate_am self.learning_rate_pm = learning_rate_pm - self.coefs_am = coefs_am + if coefs_am is None: + self.coefs_am = [1.0, 0, 0, 0, 0] + else: + self.coefs_am = coefs_am self.coefs_am_history = [coefs_am, ] self.mses_am = [] self.errs_am = [] @@ -43,116 +47,17 @@ class Model: self.tx_mers = [] self.rx_mers = [] - self.coefs_pm = coefs_pm + if coefs_pm is None: + self.coefs_pm = [0, 0, 0, 0, 0] + else: + self.coefs_pm = coefs_pm self.coefs_pm_history = [coefs_pm, ] self.errs_pm = [] self.plot = plot - def sample_uniformly(self, tx_dpd, rx_received, n_bins=5): - """This function returns tx and rx samples in a way - that the tx amplitudes have an approximate uniform - distribution with respect to the tx_dpd amplitudes""" - mask = np.logical_and((np.abs(tx_dpd) > 0.01), (np.abs(rx_received) > 0.01)) - tx_dpd = tx_dpd[mask] - rx_received = rx_received[mask] - - txframe_aligned_abs = np.abs(tx_dpd) - ccdf_min = 0 - ccdf_max = np.max(txframe_aligned_abs) - tx_hist, ccdf_edges = np.histogram(txframe_aligned_abs, - bins=n_bins, - range=(ccdf_min, ccdf_max)) - n_choise = np.min(tx_hist) - tx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64) - rx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64) - - for idx, bin in enumerate(tx_hist): - indices = np.where((txframe_aligned_abs >= ccdf_edges[idx]) & - (txframe_aligned_abs <= ccdf_edges[idx + 1]))[0] - indices_choise = np.random.choice(indices, - n_choise, - replace=False) - rx_choice[idx * n_choise:(idx + 1) * n_choise] = \ - rx_received[indices_choise] - tx_choice[idx * n_choise:(idx + 1) * n_choise] = \ - tx_dpd[indices_choise] - - assert isinstance(rx_choice[0], np.complex64), \ - "rx_choice is not complex64 but {}".format(rx_choice[0].dtype) - assert isinstance(tx_choice[0], np.complex64), \ - "tx_choice is not complex64 but {}".format(tx_choice[0].dtype) - - return tx_choice, rx_choice - - def dpd_amplitude(self, sig, coefs=None): - if coefs is None: - coefs = self.coefs_am - assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype) - sig_abs = np.abs(sig) - A_sig = np.vstack([np.ones(sig_abs.shape), - sig_abs ** 1, - sig_abs ** 2, - sig_abs ** 3, - sig_abs ** 4, - ]).T - sig_dpd = sig * np.sum(A_sig * coefs, axis=1) - return sig_dpd, A_sig - - def dpd_phase(self, sig, coefs=None): - if coefs is None: - coefs = self.coefs_pm - assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype) - sig_abs = np.abs(sig) - A_phase = np.vstack([np.ones(sig_abs.shape), - sig_abs ** 1, - sig_abs ** 2, - sig_abs ** 3, - sig_abs ** 4, - ]).T - phase_diff_est = np.sum(A_phase * coefs, axis=1) - return phase_diff_est, A_phase - - def _next_am_coefficent(self, tx_choice, rx_choice): - """Calculate new coefficients for AM/AM correction""" - rx_dpd, rx_A = self.dpd_amplitude(rx_choice) - rx_dpd = rx_dpd * ( - np.median(np.abs(tx_choice)) / - np.median(np.abs(rx_dpd))) - err = np.abs(rx_dpd) - np.abs(tx_choice) - mse = np.mean(np.abs((rx_dpd - tx_choice) ** 2)) - self.mses_am.append(mse) - self.errs_am.append(np.mean(err**2)) - - reg = linear_model.Ridge(alpha=0.00001) - reg.fit(rx_A, err) - a_delta = reg.coef_ - new_coefs_am = self.coefs_am - self.learning_rate_am * a_delta - new_coefs_am = new_coefs_am * (self.coefs_am[0] / new_coefs_am[0]) - return new_coefs_am - - def _next_pm_coefficent(self, tx_choice, rx_choice): - """Calculate new coefficients for AM/PM correction - Assuming deviations smaller than pi/2""" - phase_diff_choice = np.angle( - (rx_choice * tx_choice.conjugate()) / - (np.abs(rx_choice) * np.abs(tx_choice)) - ) - plt.hist(phase_diff_choice) - plt.savefig('/tmp/hist_' + str(np.random.randint(0,1000)) + '.svg') - plt.clf() - phase_diff_est, phase_A = self.dpd_phase(rx_choice) - err_phase = phase_diff_est - phase_diff_choice - self.errs_pm.append(np.mean(np.abs(err_phase ** 2))) - - reg = linear_model.Ridge(alpha=0.00001) - reg.fit(phase_A, err_phase) - p_delta = reg.coef_ - new_coefs_pm = self.coefs_pm - self.learning_rate_pm * p_delta - - return new_coefs_pm, phase_diff_choice - - def get_next_coefs(self, tx_dpd, rx_received): + def train(self, tx_dpd, rx_received): + """Give new training data to the model""" # Check data type assert tx_dpd[0].dtype == np.complex64, \ "tx_dpd is not complex64 but {}".format(tx_dpd[0].dtype) @@ -164,7 +69,7 @@ class Model: np.median(np.abs(tx_dpd)) + np.median(np.abs(rx_received))) assert normalization_error < 0.01, "Non normalized signals" - tx_choice, rx_choice = self.sample_uniformly(tx_dpd, rx_received) + tx_choice, rx_choice = self._sample_uniformly(tx_dpd, rx_received) new_coefs_am = self._next_am_coefficent(tx_choice, rx_choice) new_coefs_pm, phase_diff_choice = self._next_pm_coefficent(tx_choice, rx_choice) @@ -255,8 +160,8 @@ class Model: ax = plt.subplot(4, 2, i_sub) rx_range = np.linspace(0, 1, num=100, dtype=np.complex64) - rx_range_dpd = self.dpd_amplitude(rx_range)[0] - rx_range_dpd_new = self.dpd_amplitude(rx_range, new_coefs_am)[0] + rx_range_dpd = self._dpd_amplitude(rx_range)[0] + rx_range_dpd_new = self._dpd_amplitude(rx_range, new_coefs_am)[0] i_sub += 1 ax.scatter( np.abs(tx_choice), @@ -284,8 +189,8 @@ class Model: ax.set_ylabel("Coefficient Value") phase_range = np.linspace(0, 1, num=100, dtype=np.complex64) - phase_range_dpd = self.dpd_phase(phase_range)[0] - phase_range_dpd_new = self.dpd_phase(phase_range, + phase_range_dpd = self._dpd_phase(phase_range)[0] + phase_range_dpd_new = self._dpd_phase(phase_range, coefs=new_coefs_pm)[0] ax = plt.subplot(4, 2, i_sub) i_sub += 1 @@ -330,11 +235,139 @@ class Model: self.coefs_am_history.append(self.coefs_am) self.coefs_pm = new_coefs_pm self.coefs_pm_history.append(self.coefs_pm) - return self.coefs_am, self.coefs_pm + + def get_dpd_data(self): + return "poly", self.coefs_am, self.coefs_pm + + def _sample_uniformly(self, tx_dpd, rx_received, n_bins=5): + """This function returns tx and rx samples in a way + that the tx amplitudes have an approximate uniform + distribution with respect to the tx_dpd amplitudes""" + mask = np.logical_and((np.abs(tx_dpd) > 0.01), (np.abs(rx_received) > 0.01)) + tx_dpd = tx_dpd[mask] + rx_received = rx_received[mask] + + txframe_aligned_abs = np.abs(tx_dpd) + ccdf_min = 0 + ccdf_max = np.max(txframe_aligned_abs) + tx_hist, ccdf_edges = np.histogram(txframe_aligned_abs, + bins=n_bins, + range=(ccdf_min, ccdf_max)) + n_choise = np.min(tx_hist) + tx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64) + rx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64) + + for idx, bin in enumerate(tx_hist): + indices = np.where((txframe_aligned_abs >= ccdf_edges[idx]) & + (txframe_aligned_abs <= ccdf_edges[idx + 1]))[0] + indices_choise = np.random.choice(indices, + n_choise, + replace=False) + rx_choice[idx * n_choise:(idx + 1) * n_choise] = \ + rx_received[indices_choise] + tx_choice[idx * n_choise:(idx + 1) * n_choise] = \ + tx_dpd[indices_choise] + + assert isinstance(rx_choice[0], np.complex64), \ + "rx_choice is not complex64 but {}".format(rx_choice[0].dtype) + assert isinstance(tx_choice[0], np.complex64), \ + "tx_choice is not complex64 but {}".format(tx_choice[0].dtype) + + return tx_choice, rx_choice + + def _dpd_amplitude(self, sig, coefs=None): + if coefs is None: + coefs = self.coefs_am + assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype) + sig_abs = np.abs(sig) + A_sig = np.vstack([np.ones(sig_abs.shape), + sig_abs ** 1, + sig_abs ** 2, + sig_abs ** 3, + sig_abs ** 4, + ]).T + sig_dpd = sig * np.sum(A_sig * coefs, axis=1) + return sig_dpd, A_sig + + def _dpd_phase(self, sig, coefs=None): + if coefs is None: + coefs = self.coefs_pm + assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype) + sig_abs = np.abs(sig) + A_phase = np.vstack([np.ones(sig_abs.shape), + sig_abs ** 1, + sig_abs ** 2, + sig_abs ** 3, + sig_abs ** 4, + ]).T + phase_diff_est = np.sum(A_phase * coefs, axis=1) + return phase_diff_est, A_phase + + def _next_am_coefficent(self, tx_choice, rx_choice): + """Calculate new coefficients for AM/AM correction""" + rx_dpd, rx_A = self._dpd_amplitude(rx_choice) + rx_dpd = rx_dpd * ( + np.median(np.abs(tx_choice)) / + np.median(np.abs(rx_dpd))) + err = np.abs(rx_dpd) - np.abs(tx_choice) + mse = np.mean(np.abs((rx_dpd - tx_choice) ** 2)) + self.mses_am.append(mse) + self.errs_am.append(np.mean(err**2)) + + reg = linear_model.Ridge(alpha=0.00001) + reg.fit(rx_A, err) + a_delta = reg.coef_ + new_coefs_am = self.coefs_am - self.learning_rate_am * a_delta + new_coefs_am = new_coefs_am * (self.coefs_am[0] / new_coefs_am[0]) + return new_coefs_am + + def _next_pm_coefficent(self, tx_choice, rx_choice): + """Calculate new coefficients for AM/PM correction + Assuming deviations smaller than pi/2""" + phase_diff_choice = np.angle( + (rx_choice * tx_choice.conjugate()) / + (np.abs(rx_choice) * np.abs(tx_choice)) + ) + plt.hist(phase_diff_choice) + plt.savefig('/tmp/hist_' + str(np.random.randint(0,1000)) + '.svg') + plt.clf() + phase_diff_est, phase_A = self._dpd_phase(rx_choice) + err_phase = phase_diff_est - phase_diff_choice + self.errs_pm.append(np.mean(np.abs(err_phase ** 2))) + + reg = linear_model.Ridge(alpha=0.00001) + reg.fit(phase_A, err_phase) + p_delta = reg.coef_ + new_coefs_pm = self.coefs_pm - self.learning_rate_pm * p_delta + + return new_coefs_pm, phase_diff_choice + +class LutModel: + """Implements a model that calculates lookup table coefficients""" + + def __init__(self, + c, + SA, + MER, + learning_rate=1., + plot=False): + logging.debug("Initialising LUT Model") + self.c = c + self.SA = SA + self.MER = MER + self.learning_rate = learning_rate + self.plot = plot + + def train(self, tx_dpd, rx_received): + pass + + def get_dpd_data(self): + return ("lut", np.ones(32, dtype=np.complex64)) # The MIT License (MIT) # # Copyright (c) 2017 Andreas Steger +# Copyright (c) 2017 Matthias P. Braendli # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal |