summaryrefslogtreecommitdiffstats
path: root/dpd
diff options
context:
space:
mode:
authorandreas128 <Andreas>2017-09-10 18:19:55 +0200
committerandreas128 <Andreas>2017-09-10 18:19:55 +0200
commit6e580248099ce4995ff0ba957d9f9ba64cfdfff2 (patch)
treea6a75008b6091eca439404e2ee91c36a87ec9753 /dpd
parent086348ab4d788f8453ca78fed4f3f837779c9916 (diff)
downloaddabmod-6e580248099ce4995ff0ba957d9f9ba64cfdfff2.tar.gz
dabmod-6e580248099ce4995ff0ba957d9f9ba64cfdfff2.tar.bz2
dabmod-6e580248099ce4995ff0ba957d9f9ba64cfdfff2.zip
Add phase dpd, cleanup
Diffstat (limited to 'dpd')
-rw-r--r--dpd/src/Model.py297
1 files changed, 205 insertions, 92 deletions
diff --git a/dpd/src/Model.py b/dpd/src/Model.py
index 61c487e..4ccf0be 100644
--- a/dpd/src/Model.py
+++ b/dpd/src/Model.py
@@ -8,119 +8,174 @@
import datetime
import os
import logging
+
logging_path = os.path.dirname(logging.getLoggerClass().root.handlers[0].baseFilename)
import numpy as np
import matplotlib.pyplot as plt
+
class Model:
"""Calculates new coefficients using the measurement and the old
coefficients"""
- def __init__(self, c, SA, MER, coefs_am, coefs_pm, plot=False):
+ def __init__(self,
+ c,
+ SA,
+ MER,
+ coefs_am,
+ coefs_pm,
+ learning_rate_am=0.1,
+ learning_rate_pm=0.1,
+ plot=False):
self.c = c
self.SA = SA
self.MER = MER
+ self.learning_rate_am = learning_rate_am
+ self.learning_rate_pm = learning_rate_pm
+
self.coefs_am = coefs_am
- self.coefs_history = [coefs_am, ]
- self.mses = []
- self.errs = []
+ self.coefs_am_history = [coefs_am, ]
+ self.mses_am = []
+ self.errs_am = []
+
self.tx_mers = []
self.rx_mers = []
self.coefs_pm = coefs_pm
self.coefs_pm_history = [coefs_pm, ]
- self.errs_phase = [0, ]
+ self.errs_pm = []
- self.plot=plot
+ self.plot = plot
- def sample_uniformly(self, tx_dpd, rx_received, n_bins=4):
+ def sample_uniformly(self, tx_dpd, rx_received, n_bins=5):
"""This function returns tx and rx samples in a way
that the tx amplitudes have an approximate uniform
distribution with respect to the tx_dpd amplitudes"""
+ mask = np.logical_and((np.abs(tx_dpd) > 0.01), (np.abs(rx_received) > 0.01))
+ tx_dpd = tx_dpd[mask]
+ rx_received = rx_received[mask]
+
txframe_aligned_abs = np.abs(tx_dpd)
ccdf_min = 0
ccdf_max = np.max(txframe_aligned_abs)
tx_hist, ccdf_edges = np.histogram(txframe_aligned_abs,
bins=n_bins,
range=(ccdf_min, ccdf_max))
- n_choise = np.min(tx_hist)
+ n_choise = np.min(tx_hist)
tx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64)
rx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64)
for idx, bin in enumerate(tx_hist):
indices = np.where((txframe_aligned_abs >= ccdf_edges[idx]) &
- (txframe_aligned_abs <= ccdf_edges[idx+1]))[0]
- indices_choise = np.random.choice(indices, n_choise, replace=False)
- rx_choice[idx*n_choise:(idx+1)*n_choise] = rx_received[indices_choise]
- tx_choice[idx*n_choise:(idx+1)*n_choise] = tx_dpd[indices_choise]
+ (txframe_aligned_abs <= ccdf_edges[idx + 1]))[0]
+ indices_choise = np.random.choice(indices,
+ n_choise,
+ replace=False)
+ rx_choice[idx * n_choise:(idx + 1) * n_choise] = \
+ rx_received[indices_choise]
+ tx_choice[idx * n_choise:(idx + 1) * n_choise] = \
+ tx_dpd[indices_choise]
+
+ assert isinstance(rx_choice[0], np.complex64), \
+ "rx_choice is not complex64 but {}".format(rx_choice[0].dtype)
+ assert isinstance(tx_choice[0], np.complex64), \
+ "tx_choice is not complex64 but {}".format(tx_choice[0].dtype)
+
return tx_choice, rx_choice
- def amplitude_predistortion(self, sig):
+ def dpd_amplitude(self, sig, coefs=None):
+ if coefs is None:
+ coefs = self.coefs_am
+ assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype)
sig_abs = np.abs(sig)
A_sig = np.vstack([np.ones(sig_abs.shape),
- sig_abs ** 1,
- sig_abs ** 2,
- sig_abs ** 3,
- sig_abs ** 4,
- ]).T
- sig_dpd = sig * np.sum(A_sig * self.coefs_am, axis=1)
+ sig_abs ** 1,
+ sig_abs ** 2,
+ sig_abs ** 3,
+ sig_abs ** 4,
+ ]).T
+ sig_dpd = sig * np.sum(A_sig * coefs, axis=1)
return sig_dpd, A_sig
- def dpd_phase(self, tx):
- tx_abs = np.abs(tx)
- tx_A_complex = np.vstack([tx,
- tx * tx_abs ** 1,
- tx * tx_abs ** 2,
- tx * tx_abs ** 3,
- tx * tx_abs ** 4,
- ]).T
- tx_dpd = np.sum(tx_A_complex * self.coefs_pm, axis=1)
- return tx_dpd
-
- def get_next_coefs(self, tx_dpd, rx_received):
- normalization_error = np.abs(np.median(np.abs(tx_dpd)) - np.median(np.abs(rx_received)))/(np.median(np.abs(tx_dpd)) + np.median(np.abs(rx_received)))
- assert normalization_error < 0.01, "Non normalized signals"
- tx_choice, rx_choice = self.sample_uniformly(tx_dpd, rx_received)
+ def dpd_phase(self, sig, coefs=None):
+ if coefs is None:
+ coefs = self.coefs_pm
+ assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype)
+ sig_abs = np.abs(sig)
+ A_phase = np.vstack([np.ones(sig_abs.shape),
+ sig_abs ** 1,
+ sig_abs ** 2,
+ sig_abs ** 3,
+ sig_abs ** 4,
+ ]).T
+ phase_diff_est = np.sum(A_phase * coefs, axis=1)
+ return phase_diff_est, A_phase
- # Calculate new coefficients for AM/AM correction
- rx_dpd, rx_A = self.amplitude_predistortion(rx_choice)
+ def _next_am_coefficent(self, tx_choice, rx_choice):
+ """Calculate new coefficients for AM/AM correction"""
+ rx_dpd, rx_A = self.dpd_amplitude(rx_choice)
rx_dpd = rx_dpd * (
np.median(np.abs(tx_choice)) /
np.median(np.abs(rx_dpd)))
-
err = np.abs(rx_dpd) - np.abs(tx_choice)
- self.errs.append(np.mean(np.abs(err ** 2)))
-
- mse = np.mean(np.abs((rx_dpd - tx_choice)**2))
- self.mses.append(mse)
+ mse = np.mean(np.abs((rx_dpd - tx_choice) ** 2))
+ self.mses_am.append(mse)
+ self.errs_am.append(np.mean(err**2))
a_delta = np.linalg.lstsq(rx_A, err)[0]
- new_coefs = self.coefs_am - 0.1 * a_delta
- new_coefs = new_coefs * (self.coefs_am[0] / new_coefs[0])
- assert np.abs(self.coefs_am[0] / new_coefs[0] - 1) < 0.1, \
- "Too large change in first " \
- "coefficient. {}, {}".format(self.coefs_am[0], new_coefs[0])
- logging.debug("a_delta {}".format(a_delta))
- logging.debug("new coefs_am {}".format(new_coefs))
-
- rx_range = np.linspace(0, 1, num=100)
- rx_range_dpd = self.amplitude_predistortion(rx_range)[0]
- rx_range = rx_range[(rx_range_dpd > 0) & (rx_range_dpd < 2)]
- rx_range_dpd = rx_range_dpd[(rx_range_dpd > 0) & (rx_range_dpd < 2)]
+ new_coefs_am = self.coefs_am - self.learning_rate_am * a_delta
+ new_coefs_am = new_coefs_am * (self.coefs_am[0] / new_coefs_am[0])
+ return new_coefs_am
+
+ def _next_pm_coefficent(self, tx_choice, rx_choice):
+ """Calculate new coefficients for AM/PM correction
+ Assuming deviations smaller than pi/2"""
+ phase_diff_choice = np.angle(
+ (rx_choice * tx_choice.conjugate()) /
+ (np.abs(rx_choice) * np.abs(tx_choice))
+ )
+ plt.hist(phase_diff_choice)
+ plt.savefig('/tmp/hist_' + str(np.random.randint(0,1000)) + '.svg')
+ plt.clf()
+ phase_diff_est, phase_A = self.dpd_phase(rx_choice)
+ err_phase = phase_diff_est - phase_diff_choice
+ self.errs_pm.append(np.mean(np.abs(err_phase ** 2)))
+
+ p_delta = np.linalg.lstsq(phase_A, err_phase)[0]
+ new_coefs_pm = self.coefs_pm - self.learning_rate_pm * p_delta
+
+ return new_coefs_pm, phase_diff_choice
+
+ def get_next_coefs(self, tx_dpd, rx_received):
+ # Check data type
+ assert tx_dpd[0].dtype == np.complex64, \
+ "tx_dpd is not complex64 but {}".format(tx_dpd[0].dtype)
+ assert rx_received[0].dtype == np.complex64, \
+ "rx_received is not complex64 but {}".format(rx_received[0].dtype)
+ # Check if signals have same normalization
+ normalization_error = np.abs(np.median(np.abs(tx_dpd)) -
+ np.median(np.abs(rx_received))) / (
+ np.median(np.abs(tx_dpd)) + np.median(np.abs(rx_received)))
+ assert normalization_error < 0.01, "Non normalized signals"
+
+ tx_choice, rx_choice = self.sample_uniformly(tx_dpd, rx_received)
+ new_coefs_am = self._next_am_coefficent(tx_choice, rx_choice)
+ new_coefs_pm, phase_diff_choice = self._next_pm_coefficent(tx_choice, rx_choice)
logging.debug('txframe: min {:.2f}, max {:.2f}, ' \
'median {:.2f}; rxframe: min {:.2f}, max {:.2f}, ' \
- 'median {:.2f}; a_delta {}; new coefs_am {}'.format(
+ 'median {:.2f}; new coefs_am {};' \
+ 'new_coefs_pm {}'.format(
np.min(np.abs(tx_dpd)),
np.max(np.abs(tx_dpd)),
np.median(np.abs(tx_dpd)),
np.min(np.abs(rx_choice)),
np.max(np.abs(rx_choice)),
np.median(np.abs(rx_choice)),
- a_delta,
- new_coefs))
+ new_coefs_am,
+ new_coefs_pm))
if logging.getLogger().getEffectiveLevel() == logging.DEBUG and self.plot:
off = self.SA.calc_offset(tx_dpd)
@@ -133,41 +188,37 @@ class Model:
dt = datetime.datetime.now().isoformat()
fig_path = logging_path + "/" + dt + "_Model.svg"
- fig = plt.figure(figsize=(3*6, 6))
+ fig = plt.figure(figsize=(2 * 6, 2 * 6))
+
+ i_sub = 1
- ax = plt.subplot(2,3,1)
+ ax = plt.subplot(4, 2, i_sub)
+ i_sub += 1
ax.plot(np.abs(tx_dpd[:128]),
label="TX sent",
linestyle=":")
ax.plot(np.abs(rx_received[:128]),
label="RX received",
color="red")
- ax.set_title("Synchronized Signals of Iteration {}".format(len(self.coefs_history)))
+ ax.set_title("Synchronized Signals of Iteration {}"
+ .format(len(self.coefs_am_history)))
ax.set_xlabel("Samples")
ax.set_ylabel("Amplitude")
- ax.text(0, 0, "TX (max {:01.3f}, mean {:01.3f}, median {:01.3f})".format(
+ ax.text(0, 0, "TX (max {:01.3f}, mean {:01.3f}, " \
+ "median {:01.3f})".format(
np.max(np.abs(tx_dpd)),
np.mean(np.abs(tx_dpd)),
np.median(np.abs(tx_dpd))
- ), size = 8)
+ ), size=8)
ax.legend(loc=4)
- ax = plt.subplot(2,3,2)
- ax.scatter(
- np.abs(tx_choice),
- np.abs(rx_choice),
- s=0.1)
- ax.plot(rx_range_dpd / self.coefs_am[0], rx_range, linewidth=0.25)
- ax.set_title("Amplifier Characteristic")
- ax.set_xlabel("TX Amplitude")
- ax.set_ylabel("RX Amplitude")
-
- ax = plt.subplot(2,3,3)
+ ax = plt.subplot(4, 2, i_sub)
+ i_sub += 1
ccdf_min, ccdf_max = 0, 1
tx_hist, ccdf_edges = np.histogram(np.abs(tx_dpd),
- bins=60,
- range=(ccdf_min, ccdf_max))
- tx_hist_normalized = tx_hist.astype(float)/np.sum(tx_hist)
+ bins=60,
+ range=(ccdf_min, ccdf_max))
+ tx_hist_normalized = tx_hist.astype(float) / np.sum(tx_hist)
ccdf = 1.0 - np.cumsum(tx_hist_normalized)
ax.semilogy(ccdf_edges[:-1], ccdf, label="CCDF")
ax.semilogy(ccdf_edges[:-1],
@@ -175,23 +226,22 @@ class Model:
label="Histogram",
drawstyle='steps')
ax.legend(loc=4)
- ax.set_ylim(1e-5,2)
+ ax.set_ylim(1e-5, 2)
ax.set_title("Complementary Cumulative Distribution Function")
ax.set_xlabel("TX Amplitude")
ax.set_ylabel("Ratio of Samples larger than x")
- ax = plt.subplot(2,3,4)
- coefs_history = np.array(self.coefs_history)
- for idx, coef_hist in enumerate(coefs_history.T):
- ax.plot(coef_hist,
- label="Coef {}".format(idx),
- linewidth=0.5)
+ ax = plt.subplot(4, 2, i_sub)
+ i_sub += 1
+ ax.semilogy(np.array(self.mses_am) + 1e-10, label="log(MSE)")
+ ax.semilogy(np.array(self.errs_am) + 1e-10, label="log(ERR)")
ax.legend(loc=4)
- ax.set_title("AM/AM Coefficient History")
+ ax.set_title("MSE History")
ax.set_xlabel("Iterations")
- ax.set_ylabel("Coefficient Value")
+ ax.set_ylabel("MSE")
- ax = plt.subplot(2,3,5)
+ ax = plt.subplot(4, 2, i_sub)
+ i_sub += 1
ax.plot(self.tx_mers, label="TX MER")
ax.plot(self.rx_mers, label="RX MER")
ax.legend(loc=4)
@@ -199,20 +249,83 @@ class Model:
ax.set_xlabel("Iterations")
ax.set_ylabel("MER")
- ax = plt.subplot(2,3,6)
- ax.plot(self.mses, label="MSE")
- ax.plot(self.errs, label="ERR")
+ ax = plt.subplot(4, 2, i_sub)
+ rx_range = np.linspace(0, 1, num=100, dtype=np.complex64)
+ rx_range_dpd = self.dpd_amplitude(rx_range)[0]
+ rx_range_dpd_new = self.dpd_amplitude(rx_range, new_coefs_am)[0]
+ i_sub += 1
+ ax.scatter(
+ np.abs(tx_choice),
+ np.abs(rx_choice),
+ s=0.1)
+ ax.plot(rx_range_dpd / self.coefs_am[0], rx_range, linewidth=0.25, label="current")
+ ax.plot(rx_range_dpd_new / self.coefs_am[0], rx_range, linewidth=0.25, label="next")
+ ax.set_ylim(0, 1)
+ ax.set_xlim(0, 1)
+ ax.legend()
+ ax.set_title("Amplifier Characteristic")
+ ax.set_xlabel("TX Amplitude")
+ ax.set_ylabel("RX Amplitude")
+
+ ax = plt.subplot(4, 2, i_sub)
+ i_sub += 1
+ coefs_am_history = np.array(self.coefs_am_history)
+ for idx, coef_hist in enumerate(coefs_am_history.T):
+ ax.plot(coef_hist,
+ label="Coef {}".format(idx),
+ linewidth=0.5)
+ ax.legend(loc=4)
+ ax.set_title("AM/AM Coefficient History")
+ ax.set_xlabel("Iterations")
+ ax.set_ylabel("Coefficient Value")
+
+ phase_range = np.linspace(0, 1, num=100, dtype=np.complex64)
+ phase_range_dpd = self.dpd_phase(phase_range)[0]
+ phase_range_dpd_new = self.dpd_phase(phase_range,
+ coefs=new_coefs_pm)[0]
+ ax = plt.subplot(4, 2, i_sub)
+ i_sub += 1
+ ax.scatter(
+ np.abs(tx_choice),
+ np.rad2deg(phase_diff_choice),
+ s=0.1)
+ ax.plot(
+ np.abs(phase_range),
+ np.rad2deg(phase_range_dpd),
+ linewidth=0.25,
+ label="current")
+ ax.plot(
+ np.abs(phase_range),
+ np.rad2deg(phase_range_dpd_new),
+ linewidth=0.25,
+ label="next")
+ ax.set_ylim(-60, 60)
+ ax.set_xlim(0, 1)
+ ax.legend()
+ ax.set_title("Amplifier Characteristic")
+ ax.set_xlabel("TX Amplitude")
+ ax.set_ylabel("Phase Difference")
+
+ ax = plt.subplot(4, 2, i_sub)
+ i_sub += 1
+ coefs_pm_history = np.array(self.coefs_pm_history)
+ for idx, coef_phase_hist in enumerate(coefs_pm_history.T):
+ ax.plot(coef_phase_hist,
+ label="Coef {}".format(idx),
+ linewidth=0.5)
ax.legend(loc=4)
- ax.set_title("MSE History")
+ ax.set_title("AM/PM Coefficient History")
ax.set_xlabel("Iterations")
- ax.set_ylabel("MSE")
+ ax.set_ylabel("Coefficient Value")
fig.tight_layout()
fig.savefig(fig_path)
fig.clf()
- self.coefs_am = new_coefs
- self.coefs_history.append(self.coefs_am)
+ self.coefs_am = new_coefs_am
+ self.coefs_am_history.append(self.coefs_am)
+ self.coefs_pm = new_coefs_pm
+ self.coefs_pm_history.append(self.coefs_pm)
return self.coefs_am, self.coefs_pm
# The MIT License (MIT)