summaryrefslogtreecommitdiffstats
path: root/dpd
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-09-13 18:55:39 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-09-13 18:55:39 +0200
commit1ca5368f547c429bf0d86dac78162310e1d2b032 (patch)
tree6c6949b3ecf235ab18a6d21af6d3ad6105190d67 /dpd
parent4f9372c130960559a0bba13828a810eb57e30123 (diff)
downloaddabmod-1ca5368f547c429bf0d86dac78162310e1d2b032.tar.gz
dabmod-1ca5368f547c429bf0d86dac78162310e1d2b032.tar.bz2
dabmod-1ca5368f547c429bf0d86dac78162310e1d2b032.zip
Add LUT predistorter
Diffstat (limited to 'dpd')
-rw-r--r--dpd/README.md23
-rw-r--r--dpd/dpd.ini6
-rw-r--r--dpd/lut.coef64
-rwxr-xr-xdpd/main.py51
-rw-r--r--dpd/poly.coef3
-rw-r--r--dpd/src/Adapt.py89
-rw-r--r--dpd/src/Model.py259
7 files changed, 328 insertions, 167 deletions
diff --git a/dpd/README.md b/dpd/README.md
index b5a6b81..83a2986 100644
--- a/dpd/README.md
+++ b/dpd/README.md
@@ -8,7 +8,9 @@ Concept
ODR-DabMod makes outgoing TX samples and feedback RX samples available for an external tool. This
external tool can request a buffer of samples for analysis, can calculate coefficients for the
-polynomial predistorter in ODR-DabMod and load the new coefficients using the remote control.
+predistorter in ODR-DabMod and load the new coefficients using the remote control.
+
+The predistorter in ODR-DabMod supports two modes: polynomial and lookup table.
The *dpd/main.py* script is the entry point for the *DPD Calculation Engine* into which these
features will be implemented. The tool uses modules from the *dpd/src/* folder:
@@ -43,10 +45,21 @@ The coef file contains the polynomial coefficients used in the predistorter. The
very similar to the filtertaps file used in the FIR filter. It is a text-based format that can
easily be inspected and edited in a text editor.
-The first line contains the number of coefficients as an integer. The second and third lines contain
-the real, respectively the imaginary parts of the first coefficient. Fourth and fifth lines give the
-second coefficient, and so on. The file therefore contains 2xN + 1 lines if it contains N
-coefficients.
+The first line contains an integer that defines the predistorter to be used:
+1 for polynomial, 2 for lookup table.
+
+For the polynomial, the subsequent line contains the number of coefficients
+as an integer. The second and third lines contain the real, respectively the
+imaginary parts of the first coefficient. Fourth and fifth lines give the
+second coefficient, and so on. The file therefore contains 1 + 1 + 2xN lines if
+it contains N coefficients.
+
+For the lookup table, the subsequent line contains a float scalefactor that is
+applied to the samples in order to bring them into the range of 32-bit unsigned
+integer. Then, the next pair of lines contains real and imaginary part of the first
+lookup-table entry, which is multiplied to samples in first range. Then it's
+followed by 31 other pairs. The entries are complex values close to 1 + 0j.
+The file therefore contains 1 + 1 + 2xN lines if it contains N coefficients.
TODO
----
diff --git a/dpd/dpd.ini b/dpd/dpd.ini
index 1bc51de..9a76393 100644
--- a/dpd/dpd.ini
+++ b/dpd/dpd.ini
@@ -10,8 +10,8 @@ filelog=1
filename=/tmp/dabmod.log
[input]
-transport=tcp
-source=localhost:9200
+transport=file
+source=/home/bram/dab/mmbtools-aux/eti/buddard.eti
[modulator]
gainmode=var
@@ -31,7 +31,7 @@ polycoeffile=dpd/poly.coef
[output]
# to prepare a file for the dpd/iq_file_server.py script,
# use output=file
-output=uhd
+output=file
[fileoutput]
filename=dpd.iq
diff --git a/dpd/lut.coef b/dpd/lut.coef
new file mode 100644
index 0000000..a198d56
--- /dev/null
+++ b/dpd/lut.coef
@@ -0,0 +1,64 @@
+2
+4294967296
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
+1
+0
diff --git a/dpd/main.py b/dpd/main.py
index 5e67c90..de3453e 100755
--- a/dpd/main.py
+++ b/dpd/main.py
@@ -13,7 +13,7 @@ predistortion module of ODR-DabMod."""
import datetime
import os
import time
-
+import sys
import matplotlib
matplotlib.use('GTKAgg')
@@ -82,8 +82,8 @@ parser.add_argument('--samps', default='81920',
parser.add_argument('-i', '--iterations', default='1',
help='Number of iterations to run',
required=False)
-parser.add_argument('-l', '--load-poly',
- help='Load existing polynomial',
+parser.add_argument('-L', '--lut',
+ help='Use lookup table instead of polynomial predistorter',
action="store_true")
cli_args = parser.parse_args()
@@ -105,12 +105,13 @@ c = src.const.const(samplerate)
meas = Measure.Measure(samplerate, port, num_req)
adapt = Adapt.Adapt(port_rc, coef_path)
-coefs_am, coefs_pm = adapt.get_coefs()
-if cli_args.load_poly:
- model = Model.Model(c, SA, MER, coefs_am, coefs_pm, plot=True)
+dpddata = adapt.get_predistorter()
+
+if cli_args.lut:
+ model = Model.LutModel(c, SA, MER, plot=True)
else:
- model = Model.Model(c, SA, MER, [1.0, 0, 0, 0, 0], [0, 0, 0, 0, 0], plot=True)
-adapt.set_coefs(model.coefs_am, model.coefs_pm)
+ model = Model.PolyModel(c, SA, MER, None, None, plot=True)
+adapt.set_predistorter(model.get_dpd_data())
adapt.set_digital_gain(digital_gain)
adapt.set_txgain(txgain)
adapt.set_rxgain(rxgain)
@@ -118,13 +119,28 @@ adapt.set_rxgain(rxgain)
tx_gain = adapt.get_txgain()
rx_gain = adapt.get_rxgain()
digital_gain = adapt.get_digital_gain()
-dpd_coefs_am, dpd_coefs_pm = adapt.get_coefs()
-logging.info(
- "TX gain {}, RX gain {}, dpd_coefs_am {},"
- " dpd_coefs_pm {}, digital_gain {}".format(
- tx_gain, rx_gain, dpd_coefs_am, dpd_coefs_pm, digital_gain
+
+dpddata = adapt.get_coefs()
+if dpddata[0] == "poly":
+ coefs_am = dpddata[1]
+ coefs_pm = dpddata[2]
+ logging.info(
+ "TX gain {}, RX gain {}, dpd_coefs_am {},"
+ " dpd_coefs_pm {}, digital_gain {}".format(
+ tx_gain, rx_gain, coefs_am, coefs_pm, digital_gain
+ )
)
-)
+elif dpddata[0] == "lut":
+ scalefactor = dpddata[1]
+ lut = dpddata[2]
+ logging.info(
+ "TX gain {}, RX gain {}, LUT scalefactor {},"
+ " LUT {}, digital_gain {}".format(
+ tx_gain, rx_gain, scalefactor, lut, digital_gain
+ )
+ )
+else:
+ logging.error("Unknown dpd data format {}".format(dpddata[0]))
tx_agc = TX_Agc.TX_Agc(adapt)
@@ -141,8 +157,8 @@ for i in range(num_iter):
if tx_agc.adapt_if_necessary(txframe_aligned):
continue
- coefs_am, coefs_pm = model.get_next_coefs(txframe_aligned, rxframe_aligned)
- adapt.set_coefs(coefs_am, coefs_pm)
+ model.train(txframe_aligned, rxframe_aligned)
+ adapt.set_predistorter(model.get_dpd_data())
off = SA.calc_offset(txframe_aligned)
tx_mer = MER.calc_mer(txframe_aligned[off:off + c.T_U])
@@ -155,7 +171,8 @@ for i in range(num_iter):
# The MIT License (MIT)
#
-# Copyright (c) 2017 Andreas Steger, Matthias P. Braendli
+# Copyright (c) 2017 Andreas Steger
+# Copyright (c) 2017 Matthias P. Braendli
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
diff --git a/dpd/poly.coef b/dpd/poly.coef
index f65e369..913507a 100644
--- a/dpd/poly.coef
+++ b/dpd/poly.coef
@@ -1,5 +1,6 @@
-5
1
+5
+1.0
0
0
0
diff --git a/dpd/src/Adapt.py b/dpd/src/Adapt.py
index b4042d6..f21bb87 100644
--- a/dpd/src/Adapt.py
+++ b/dpd/src/Adapt.py
@@ -13,6 +13,10 @@ import zmq
import logging
import numpy as np
+LUT_LEN=32
+FORMAT_POLY=1
+FORMAT_LUT=2
+
class Adapt:
"""Uses the ZMQ remote control to change parameters of the DabMod
@@ -126,45 +130,74 @@ class Adapt:
# TODO handle failure
return float(self.send_receive("get gain digital")[0])
- def _read_coef_file(self, path):
+ def get_predistorter(self):
"""Load the coefficients from the file in the format given in the README,
- return ([AM coef], [PM coef])"""
- coefs_am_out = []
- coefs_pm_out = []
- f = open(path, 'r')
+ return ("poly", [AM coef], [PM coef]) or ("lut", scalefactor, [LUT entries])"""
+ f = open(self.coef_path, 'r')
lines = f.readlines()
- n_coefs = int(lines[0])
- coefs = [float(l) for l in lines[1:]]
- i = 0
- for c in coefs:
- if i < n_coefs:
- coefs_am_out.append(c)
- elif i < 2*n_coefs:
- coefs_pm_out.append(c)
- else:
- raise ValueError(
- "Incorrect coef file format: too many coefficients in {}, should be {}, coefs are {}"
- .format(path, n_coefs, coefs))
- i += 1
- f.close()
- return (coefs_am_out, coefs_pm_out)
-
- def get_coefs(self):
- return self._read_coef_file(self.coef_path)
-
- def _write_coef_file(self, coefs_am, coefs_pm, path):
+ predistorter_format = int(lines[0])
+ if predistorter_format == FORMAT_POLY:
+ coefs_am_out = []
+ coefs_pm_out = []
+ n_coefs = int(lines[1])
+ coefs = [float(l) for l in lines[2:]]
+ i = 0
+ for c in coefs:
+ if i < n_coefs:
+ coefs_am_out.append(c)
+ elif i < 2*n_coefs:
+ coefs_pm_out.append(c)
+ else:
+ raise ValueError(
+ "Incorrect coef file format: too many coefficients in {}, should be {}, coefs are {}"
+ .format(path, n_coefs, coefs))
+ i += 1
+ f.close()
+ return ("poly", coefs_am_out, coefs_pm_out)
+ elif predistorter_format == FORMAT_LUT:
+ scalefactor = int(lines[1])
+ coefs = np.array([float(l) for l in lines[2:]], dtype=np.float32)
+ coefs = coefs.reshape((-1, 2))
+ lut = coefs[..., 0] + 1j * coefs[..., 1]
+ if len(lut) != LUT_LEN:
+ raise ValueError("Incorrect number of LUT entries ({} expected {})".format(len(lut), LUT_LEN))
+ return ("lut", scalefactor, lut)
+ else:
+ raise ValueError("Unknown predistorter format {}".format(predistorter_format))
+
+ def _write_poly_coef_file(self, coefs_am, coefs_pm, path):
assert(len(coefs_am) == len(coefs_pm))
f = open(path, 'w')
- f.write("{}\n".format(len(coefs_am)))
+ f.write("{}\n{}\n".format(FORMAT_POLY, len(coefs_am)))
for coef in coefs_am:
f.write("{}\n".format(coef))
for coef in coefs_pm:
f.write("{}\n".format(coef))
f.close()
- def set_coefs(self, coefs_am, coefs_pm):
- self._write_coef_file(coefs_am, coefs_pm, self.coef_path)
+ def _write_lut_file(self, scalefactor, lut, path):
+ assert(len(lut) == LUT_LEN)
+
+ f = open(path, 'w')
+ f.write("{}\n{}\n".format(FORMAT_LUT, scalefactor))
+ for coef in lut:
+ f.write("{}\n{}\n".format(coef.real, coef.imag))
+ f.close()
+
+ def set_predistorter(self, dpddata):
+ """Update the predistorter data in the modulator. Takes the same
+ tuple format as argument than the one returned get_predistorter()"""
+ if dpddata[0] == "poly":
+ coefs_am = dpddata[1]
+ coefs_pm = dpddata[2]
+ self._write_poly_coef_file(coefs_am, coefs_pm, self.coef_path)
+ elif dpddata[0] == "lut":
+ scalefactor = dpddata[1]
+ lut = dpddata[2]
+ self._write_lut_file(scalefactor, lut, self.coef_path)
+ else:
+ raise ValueError("Unknown predistorter '{}'".format(dpddata[0]))
self.send_receive("set memlesspoly coeffile {}".format(self.coef_path))
# The MIT License (MIT)
diff --git a/dpd/src/Model.py b/dpd/src/Model.py
index 827027a..a23f0ce 100644
--- a/dpd/src/Model.py
+++ b/dpd/src/Model.py
@@ -15,7 +15,7 @@ import numpy as np
import matplotlib.pyplot as plt
from sklearn import linear_model
-class Model:
+class PolyModel:
"""Calculates new coefficients using the measurement and the old
coefficients"""
@@ -28,6 +28,7 @@ class Model:
learning_rate_am=1.,
learning_rate_pm=1.,
plot=False):
+ logging.debug("Initialising Poly Model")
self.c = c
self.SA = SA
self.MER = MER
@@ -35,7 +36,10 @@ class Model:
self.learning_rate_am = learning_rate_am
self.learning_rate_pm = learning_rate_pm
- self.coefs_am = coefs_am
+ if coefs_am is None:
+ self.coefs_am = [1.0, 0, 0, 0, 0]
+ else:
+ self.coefs_am = coefs_am
self.coefs_am_history = [coefs_am, ]
self.mses_am = []
self.errs_am = []
@@ -43,116 +47,17 @@ class Model:
self.tx_mers = []
self.rx_mers = []
- self.coefs_pm = coefs_pm
+ if coefs_pm is None:
+ self.coefs_pm = [0, 0, 0, 0, 0]
+ else:
+ self.coefs_pm = coefs_pm
self.coefs_pm_history = [coefs_pm, ]
self.errs_pm = []
self.plot = plot
- def sample_uniformly(self, tx_dpd, rx_received, n_bins=5):
- """This function returns tx and rx samples in a way
- that the tx amplitudes have an approximate uniform
- distribution with respect to the tx_dpd amplitudes"""
- mask = np.logical_and((np.abs(tx_dpd) > 0.01), (np.abs(rx_received) > 0.01))
- tx_dpd = tx_dpd[mask]
- rx_received = rx_received[mask]
-
- txframe_aligned_abs = np.abs(tx_dpd)
- ccdf_min = 0
- ccdf_max = np.max(txframe_aligned_abs)
- tx_hist, ccdf_edges = np.histogram(txframe_aligned_abs,
- bins=n_bins,
- range=(ccdf_min, ccdf_max))
- n_choise = np.min(tx_hist)
- tx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64)
- rx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64)
-
- for idx, bin in enumerate(tx_hist):
- indices = np.where((txframe_aligned_abs >= ccdf_edges[idx]) &
- (txframe_aligned_abs <= ccdf_edges[idx + 1]))[0]
- indices_choise = np.random.choice(indices,
- n_choise,
- replace=False)
- rx_choice[idx * n_choise:(idx + 1) * n_choise] = \
- rx_received[indices_choise]
- tx_choice[idx * n_choise:(idx + 1) * n_choise] = \
- tx_dpd[indices_choise]
-
- assert isinstance(rx_choice[0], np.complex64), \
- "rx_choice is not complex64 but {}".format(rx_choice[0].dtype)
- assert isinstance(tx_choice[0], np.complex64), \
- "tx_choice is not complex64 but {}".format(tx_choice[0].dtype)
-
- return tx_choice, rx_choice
-
- def dpd_amplitude(self, sig, coefs=None):
- if coefs is None:
- coefs = self.coefs_am
- assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype)
- sig_abs = np.abs(sig)
- A_sig = np.vstack([np.ones(sig_abs.shape),
- sig_abs ** 1,
- sig_abs ** 2,
- sig_abs ** 3,
- sig_abs ** 4,
- ]).T
- sig_dpd = sig * np.sum(A_sig * coefs, axis=1)
- return sig_dpd, A_sig
-
- def dpd_phase(self, sig, coefs=None):
- if coefs is None:
- coefs = self.coefs_pm
- assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype)
- sig_abs = np.abs(sig)
- A_phase = np.vstack([np.ones(sig_abs.shape),
- sig_abs ** 1,
- sig_abs ** 2,
- sig_abs ** 3,
- sig_abs ** 4,
- ]).T
- phase_diff_est = np.sum(A_phase * coefs, axis=1)
- return phase_diff_est, A_phase
-
- def _next_am_coefficent(self, tx_choice, rx_choice):
- """Calculate new coefficients for AM/AM correction"""
- rx_dpd, rx_A = self.dpd_amplitude(rx_choice)
- rx_dpd = rx_dpd * (
- np.median(np.abs(tx_choice)) /
- np.median(np.abs(rx_dpd)))
- err = np.abs(rx_dpd) - np.abs(tx_choice)
- mse = np.mean(np.abs((rx_dpd - tx_choice) ** 2))
- self.mses_am.append(mse)
- self.errs_am.append(np.mean(err**2))
-
- reg = linear_model.Ridge(alpha=0.00001)
- reg.fit(rx_A, err)
- a_delta = reg.coef_
- new_coefs_am = self.coefs_am - self.learning_rate_am * a_delta
- new_coefs_am = new_coefs_am * (self.coefs_am[0] / new_coefs_am[0])
- return new_coefs_am
-
- def _next_pm_coefficent(self, tx_choice, rx_choice):
- """Calculate new coefficients for AM/PM correction
- Assuming deviations smaller than pi/2"""
- phase_diff_choice = np.angle(
- (rx_choice * tx_choice.conjugate()) /
- (np.abs(rx_choice) * np.abs(tx_choice))
- )
- plt.hist(phase_diff_choice)
- plt.savefig('/tmp/hist_' + str(np.random.randint(0,1000)) + '.svg')
- plt.clf()
- phase_diff_est, phase_A = self.dpd_phase(rx_choice)
- err_phase = phase_diff_est - phase_diff_choice
- self.errs_pm.append(np.mean(np.abs(err_phase ** 2)))
-
- reg = linear_model.Ridge(alpha=0.00001)
- reg.fit(phase_A, err_phase)
- p_delta = reg.coef_
- new_coefs_pm = self.coefs_pm - self.learning_rate_pm * p_delta
-
- return new_coefs_pm, phase_diff_choice
-
- def get_next_coefs(self, tx_dpd, rx_received):
+ def train(self, tx_dpd, rx_received):
+ """Give new training data to the model"""
# Check data type
assert tx_dpd[0].dtype == np.complex64, \
"tx_dpd is not complex64 but {}".format(tx_dpd[0].dtype)
@@ -164,7 +69,7 @@ class Model:
np.median(np.abs(tx_dpd)) + np.median(np.abs(rx_received)))
assert normalization_error < 0.01, "Non normalized signals"
- tx_choice, rx_choice = self.sample_uniformly(tx_dpd, rx_received)
+ tx_choice, rx_choice = self._sample_uniformly(tx_dpd, rx_received)
new_coefs_am = self._next_am_coefficent(tx_choice, rx_choice)
new_coefs_pm, phase_diff_choice = self._next_pm_coefficent(tx_choice, rx_choice)
@@ -255,8 +160,8 @@ class Model:
ax = plt.subplot(4, 2, i_sub)
rx_range = np.linspace(0, 1, num=100, dtype=np.complex64)
- rx_range_dpd = self.dpd_amplitude(rx_range)[0]
- rx_range_dpd_new = self.dpd_amplitude(rx_range, new_coefs_am)[0]
+ rx_range_dpd = self._dpd_amplitude(rx_range)[0]
+ rx_range_dpd_new = self._dpd_amplitude(rx_range, new_coefs_am)[0]
i_sub += 1
ax.scatter(
np.abs(tx_choice),
@@ -284,8 +189,8 @@ class Model:
ax.set_ylabel("Coefficient Value")
phase_range = np.linspace(0, 1, num=100, dtype=np.complex64)
- phase_range_dpd = self.dpd_phase(phase_range)[0]
- phase_range_dpd_new = self.dpd_phase(phase_range,
+ phase_range_dpd = self._dpd_phase(phase_range)[0]
+ phase_range_dpd_new = self._dpd_phase(phase_range,
coefs=new_coefs_pm)[0]
ax = plt.subplot(4, 2, i_sub)
i_sub += 1
@@ -330,11 +235,139 @@ class Model:
self.coefs_am_history.append(self.coefs_am)
self.coefs_pm = new_coefs_pm
self.coefs_pm_history.append(self.coefs_pm)
- return self.coefs_am, self.coefs_pm
+
+ def get_dpd_data(self):
+ return "poly", self.coefs_am, self.coefs_pm
+
+ def _sample_uniformly(self, tx_dpd, rx_received, n_bins=5):
+ """This function returns tx and rx samples in a way
+ that the tx amplitudes have an approximate uniform
+ distribution with respect to the tx_dpd amplitudes"""
+ mask = np.logical_and((np.abs(tx_dpd) > 0.01), (np.abs(rx_received) > 0.01))
+ tx_dpd = tx_dpd[mask]
+ rx_received = rx_received[mask]
+
+ txframe_aligned_abs = np.abs(tx_dpd)
+ ccdf_min = 0
+ ccdf_max = np.max(txframe_aligned_abs)
+ tx_hist, ccdf_edges = np.histogram(txframe_aligned_abs,
+ bins=n_bins,
+ range=(ccdf_min, ccdf_max))
+ n_choise = np.min(tx_hist)
+ tx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64)
+ rx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64)
+
+ for idx, bin in enumerate(tx_hist):
+ indices = np.where((txframe_aligned_abs >= ccdf_edges[idx]) &
+ (txframe_aligned_abs <= ccdf_edges[idx + 1]))[0]
+ indices_choise = np.random.choice(indices,
+ n_choise,
+ replace=False)
+ rx_choice[idx * n_choise:(idx + 1) * n_choise] = \
+ rx_received[indices_choise]
+ tx_choice[idx * n_choise:(idx + 1) * n_choise] = \
+ tx_dpd[indices_choise]
+
+ assert isinstance(rx_choice[0], np.complex64), \
+ "rx_choice is not complex64 but {}".format(rx_choice[0].dtype)
+ assert isinstance(tx_choice[0], np.complex64), \
+ "tx_choice is not complex64 but {}".format(tx_choice[0].dtype)
+
+ return tx_choice, rx_choice
+
+ def _dpd_amplitude(self, sig, coefs=None):
+ if coefs is None:
+ coefs = self.coefs_am
+ assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype)
+ sig_abs = np.abs(sig)
+ A_sig = np.vstack([np.ones(sig_abs.shape),
+ sig_abs ** 1,
+ sig_abs ** 2,
+ sig_abs ** 3,
+ sig_abs ** 4,
+ ]).T
+ sig_dpd = sig * np.sum(A_sig * coefs, axis=1)
+ return sig_dpd, A_sig
+
+ def _dpd_phase(self, sig, coefs=None):
+ if coefs is None:
+ coefs = self.coefs_pm
+ assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype)
+ sig_abs = np.abs(sig)
+ A_phase = np.vstack([np.ones(sig_abs.shape),
+ sig_abs ** 1,
+ sig_abs ** 2,
+ sig_abs ** 3,
+ sig_abs ** 4,
+ ]).T
+ phase_diff_est = np.sum(A_phase * coefs, axis=1)
+ return phase_diff_est, A_phase
+
+ def _next_am_coefficent(self, tx_choice, rx_choice):
+ """Calculate new coefficients for AM/AM correction"""
+ rx_dpd, rx_A = self._dpd_amplitude(rx_choice)
+ rx_dpd = rx_dpd * (
+ np.median(np.abs(tx_choice)) /
+ np.median(np.abs(rx_dpd)))
+ err = np.abs(rx_dpd) - np.abs(tx_choice)
+ mse = np.mean(np.abs((rx_dpd - tx_choice) ** 2))
+ self.mses_am.append(mse)
+ self.errs_am.append(np.mean(err**2))
+
+ reg = linear_model.Ridge(alpha=0.00001)
+ reg.fit(rx_A, err)
+ a_delta = reg.coef_
+ new_coefs_am = self.coefs_am - self.learning_rate_am * a_delta
+ new_coefs_am = new_coefs_am * (self.coefs_am[0] / new_coefs_am[0])
+ return new_coefs_am
+
+ def _next_pm_coefficent(self, tx_choice, rx_choice):
+ """Calculate new coefficients for AM/PM correction
+ Assuming deviations smaller than pi/2"""
+ phase_diff_choice = np.angle(
+ (rx_choice * tx_choice.conjugate()) /
+ (np.abs(rx_choice) * np.abs(tx_choice))
+ )
+ plt.hist(phase_diff_choice)
+ plt.savefig('/tmp/hist_' + str(np.random.randint(0,1000)) + '.svg')
+ plt.clf()
+ phase_diff_est, phase_A = self._dpd_phase(rx_choice)
+ err_phase = phase_diff_est - phase_diff_choice
+ self.errs_pm.append(np.mean(np.abs(err_phase ** 2)))
+
+ reg = linear_model.Ridge(alpha=0.00001)
+ reg.fit(phase_A, err_phase)
+ p_delta = reg.coef_
+ new_coefs_pm = self.coefs_pm - self.learning_rate_pm * p_delta
+
+ return new_coefs_pm, phase_diff_choice
+
+class LutModel:
+ """Implements a model that calculates lookup table coefficients"""
+
+ def __init__(self,
+ c,
+ SA,
+ MER,
+ learning_rate=1.,
+ plot=False):
+ logging.debug("Initialising LUT Model")
+ self.c = c
+ self.SA = SA
+ self.MER = MER
+ self.learning_rate = learning_rate
+ self.plot = plot
+
+ def train(self, tx_dpd, rx_received):
+ pass
+
+ def get_dpd_data(self):
+ return ("lut", np.ones(32, dtype=np.complex64))
# The MIT License (MIT)
#
# Copyright (c) 2017 Andreas Steger
+# Copyright (c) 2017 Matthias P. Braendli
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal