diff options
author | andreas128 <Andreas> | 2017-09-10 18:19:55 +0200 |
---|---|---|
committer | andreas128 <Andreas> | 2017-09-10 18:19:55 +0200 |
commit | 6e580248099ce4995ff0ba957d9f9ba64cfdfff2 (patch) | |
tree | a6a75008b6091eca439404e2ee91c36a87ec9753 /dpd/src | |
parent | 086348ab4d788f8453ca78fed4f3f837779c9916 (diff) | |
download | dabmod-6e580248099ce4995ff0ba957d9f9ba64cfdfff2.tar.gz dabmod-6e580248099ce4995ff0ba957d9f9ba64cfdfff2.tar.bz2 dabmod-6e580248099ce4995ff0ba957d9f9ba64cfdfff2.zip |
Add phase dpd, cleanup
Diffstat (limited to 'dpd/src')
-rw-r--r-- | dpd/src/Model.py | 297 |
1 files changed, 205 insertions, 92 deletions
diff --git a/dpd/src/Model.py b/dpd/src/Model.py index 61c487e..4ccf0be 100644 --- a/dpd/src/Model.py +++ b/dpd/src/Model.py @@ -8,119 +8,174 @@ import datetime import os import logging + logging_path = os.path.dirname(logging.getLoggerClass().root.handlers[0].baseFilename) import numpy as np import matplotlib.pyplot as plt + class Model: """Calculates new coefficients using the measurement and the old coefficients""" - def __init__(self, c, SA, MER, coefs_am, coefs_pm, plot=False): + def __init__(self, + c, + SA, + MER, + coefs_am, + coefs_pm, + learning_rate_am=0.1, + learning_rate_pm=0.1, + plot=False): self.c = c self.SA = SA self.MER = MER + self.learning_rate_am = learning_rate_am + self.learning_rate_pm = learning_rate_pm + self.coefs_am = coefs_am - self.coefs_history = [coefs_am, ] - self.mses = [] - self.errs = [] + self.coefs_am_history = [coefs_am, ] + self.mses_am = [] + self.errs_am = [] + self.tx_mers = [] self.rx_mers = [] self.coefs_pm = coefs_pm self.coefs_pm_history = [coefs_pm, ] - self.errs_phase = [0, ] + self.errs_pm = [] - self.plot=plot + self.plot = plot - def sample_uniformly(self, tx_dpd, rx_received, n_bins=4): + def sample_uniformly(self, tx_dpd, rx_received, n_bins=5): """This function returns tx and rx samples in a way that the tx amplitudes have an approximate uniform distribution with respect to the tx_dpd amplitudes""" + mask = np.logical_and((np.abs(tx_dpd) > 0.01), (np.abs(rx_received) > 0.01)) + tx_dpd = tx_dpd[mask] + rx_received = rx_received[mask] + txframe_aligned_abs = np.abs(tx_dpd) ccdf_min = 0 ccdf_max = np.max(txframe_aligned_abs) tx_hist, ccdf_edges = np.histogram(txframe_aligned_abs, bins=n_bins, range=(ccdf_min, ccdf_max)) - n_choise = np.min(tx_hist) + n_choise = np.min(tx_hist) tx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64) rx_choice = np.zeros(n_choise * n_bins, dtype=np.complex64) for idx, bin in enumerate(tx_hist): indices = np.where((txframe_aligned_abs >= ccdf_edges[idx]) & - (txframe_aligned_abs <= ccdf_edges[idx+1]))[0] - indices_choise = np.random.choice(indices, n_choise, replace=False) - rx_choice[idx*n_choise:(idx+1)*n_choise] = rx_received[indices_choise] - tx_choice[idx*n_choise:(idx+1)*n_choise] = tx_dpd[indices_choise] + (txframe_aligned_abs <= ccdf_edges[idx + 1]))[0] + indices_choise = np.random.choice(indices, + n_choise, + replace=False) + rx_choice[idx * n_choise:(idx + 1) * n_choise] = \ + rx_received[indices_choise] + tx_choice[idx * n_choise:(idx + 1) * n_choise] = \ + tx_dpd[indices_choise] + + assert isinstance(rx_choice[0], np.complex64), \ + "rx_choice is not complex64 but {}".format(rx_choice[0].dtype) + assert isinstance(tx_choice[0], np.complex64), \ + "tx_choice is not complex64 but {}".format(tx_choice[0].dtype) + return tx_choice, rx_choice - def amplitude_predistortion(self, sig): + def dpd_amplitude(self, sig, coefs=None): + if coefs is None: + coefs = self.coefs_am + assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype) sig_abs = np.abs(sig) A_sig = np.vstack([np.ones(sig_abs.shape), - sig_abs ** 1, - sig_abs ** 2, - sig_abs ** 3, - sig_abs ** 4, - ]).T - sig_dpd = sig * np.sum(A_sig * self.coefs_am, axis=1) + sig_abs ** 1, + sig_abs ** 2, + sig_abs ** 3, + sig_abs ** 4, + ]).T + sig_dpd = sig * np.sum(A_sig * coefs, axis=1) return sig_dpd, A_sig - def dpd_phase(self, tx): - tx_abs = np.abs(tx) - tx_A_complex = np.vstack([tx, - tx * tx_abs ** 1, - tx * tx_abs ** 2, - tx * tx_abs ** 3, - tx * tx_abs ** 4, - ]).T - tx_dpd = np.sum(tx_A_complex * self.coefs_pm, axis=1) - return tx_dpd - - def get_next_coefs(self, tx_dpd, rx_received): - normalization_error = np.abs(np.median(np.abs(tx_dpd)) - np.median(np.abs(rx_received)))/(np.median(np.abs(tx_dpd)) + np.median(np.abs(rx_received))) - assert normalization_error < 0.01, "Non normalized signals" - tx_choice, rx_choice = self.sample_uniformly(tx_dpd, rx_received) + def dpd_phase(self, sig, coefs=None): + if coefs is None: + coefs = self.coefs_pm + assert isinstance(sig[0], np.complex64), "Sig is not complex64 but {}".format(sig[0].dtype) + sig_abs = np.abs(sig) + A_phase = np.vstack([np.ones(sig_abs.shape), + sig_abs ** 1, + sig_abs ** 2, + sig_abs ** 3, + sig_abs ** 4, + ]).T + phase_diff_est = np.sum(A_phase * coefs, axis=1) + return phase_diff_est, A_phase - # Calculate new coefficients for AM/AM correction - rx_dpd, rx_A = self.amplitude_predistortion(rx_choice) + def _next_am_coefficent(self, tx_choice, rx_choice): + """Calculate new coefficients for AM/AM correction""" + rx_dpd, rx_A = self.dpd_amplitude(rx_choice) rx_dpd = rx_dpd * ( np.median(np.abs(tx_choice)) / np.median(np.abs(rx_dpd))) - err = np.abs(rx_dpd) - np.abs(tx_choice) - self.errs.append(np.mean(np.abs(err ** 2))) - - mse = np.mean(np.abs((rx_dpd - tx_choice)**2)) - self.mses.append(mse) + mse = np.mean(np.abs((rx_dpd - tx_choice) ** 2)) + self.mses_am.append(mse) + self.errs_am.append(np.mean(err**2)) a_delta = np.linalg.lstsq(rx_A, err)[0] - new_coefs = self.coefs_am - 0.1 * a_delta - new_coefs = new_coefs * (self.coefs_am[0] / new_coefs[0]) - assert np.abs(self.coefs_am[0] / new_coefs[0] - 1) < 0.1, \ - "Too large change in first " \ - "coefficient. {}, {}".format(self.coefs_am[0], new_coefs[0]) - logging.debug("a_delta {}".format(a_delta)) - logging.debug("new coefs_am {}".format(new_coefs)) - - rx_range = np.linspace(0, 1, num=100) - rx_range_dpd = self.amplitude_predistortion(rx_range)[0] - rx_range = rx_range[(rx_range_dpd > 0) & (rx_range_dpd < 2)] - rx_range_dpd = rx_range_dpd[(rx_range_dpd > 0) & (rx_range_dpd < 2)] + new_coefs_am = self.coefs_am - self.learning_rate_am * a_delta + new_coefs_am = new_coefs_am * (self.coefs_am[0] / new_coefs_am[0]) + return new_coefs_am + + def _next_pm_coefficent(self, tx_choice, rx_choice): + """Calculate new coefficients for AM/PM correction + Assuming deviations smaller than pi/2""" + phase_diff_choice = np.angle( + (rx_choice * tx_choice.conjugate()) / + (np.abs(rx_choice) * np.abs(tx_choice)) + ) + plt.hist(phase_diff_choice) + plt.savefig('/tmp/hist_' + str(np.random.randint(0,1000)) + '.svg') + plt.clf() + phase_diff_est, phase_A = self.dpd_phase(rx_choice) + err_phase = phase_diff_est - phase_diff_choice + self.errs_pm.append(np.mean(np.abs(err_phase ** 2))) + + p_delta = np.linalg.lstsq(phase_A, err_phase)[0] + new_coefs_pm = self.coefs_pm - self.learning_rate_pm * p_delta + + return new_coefs_pm, phase_diff_choice + + def get_next_coefs(self, tx_dpd, rx_received): + # Check data type + assert tx_dpd[0].dtype == np.complex64, \ + "tx_dpd is not complex64 but {}".format(tx_dpd[0].dtype) + assert rx_received[0].dtype == np.complex64, \ + "rx_received is not complex64 but {}".format(rx_received[0].dtype) + # Check if signals have same normalization + normalization_error = np.abs(np.median(np.abs(tx_dpd)) - + np.median(np.abs(rx_received))) / ( + np.median(np.abs(tx_dpd)) + np.median(np.abs(rx_received))) + assert normalization_error < 0.01, "Non normalized signals" + + tx_choice, rx_choice = self.sample_uniformly(tx_dpd, rx_received) + new_coefs_am = self._next_am_coefficent(tx_choice, rx_choice) + new_coefs_pm, phase_diff_choice = self._next_pm_coefficent(tx_choice, rx_choice) logging.debug('txframe: min {:.2f}, max {:.2f}, ' \ 'median {:.2f}; rxframe: min {:.2f}, max {:.2f}, ' \ - 'median {:.2f}; a_delta {}; new coefs_am {}'.format( + 'median {:.2f}; new coefs_am {};' \ + 'new_coefs_pm {}'.format( np.min(np.abs(tx_dpd)), np.max(np.abs(tx_dpd)), np.median(np.abs(tx_dpd)), np.min(np.abs(rx_choice)), np.max(np.abs(rx_choice)), np.median(np.abs(rx_choice)), - a_delta, - new_coefs)) + new_coefs_am, + new_coefs_pm)) if logging.getLogger().getEffectiveLevel() == logging.DEBUG and self.plot: off = self.SA.calc_offset(tx_dpd) @@ -133,41 +188,37 @@ class Model: dt = datetime.datetime.now().isoformat() fig_path = logging_path + "/" + dt + "_Model.svg" - fig = plt.figure(figsize=(3*6, 6)) + fig = plt.figure(figsize=(2 * 6, 2 * 6)) + + i_sub = 1 - ax = plt.subplot(2,3,1) + ax = plt.subplot(4, 2, i_sub) + i_sub += 1 ax.plot(np.abs(tx_dpd[:128]), label="TX sent", linestyle=":") ax.plot(np.abs(rx_received[:128]), label="RX received", color="red") - ax.set_title("Synchronized Signals of Iteration {}".format(len(self.coefs_history))) + ax.set_title("Synchronized Signals of Iteration {}" + .format(len(self.coefs_am_history))) ax.set_xlabel("Samples") ax.set_ylabel("Amplitude") - ax.text(0, 0, "TX (max {:01.3f}, mean {:01.3f}, median {:01.3f})".format( + ax.text(0, 0, "TX (max {:01.3f}, mean {:01.3f}, " \ + "median {:01.3f})".format( np.max(np.abs(tx_dpd)), np.mean(np.abs(tx_dpd)), np.median(np.abs(tx_dpd)) - ), size = 8) + ), size=8) ax.legend(loc=4) - ax = plt.subplot(2,3,2) - ax.scatter( - np.abs(tx_choice), - np.abs(rx_choice), - s=0.1) - ax.plot(rx_range_dpd / self.coefs_am[0], rx_range, linewidth=0.25) - ax.set_title("Amplifier Characteristic") - ax.set_xlabel("TX Amplitude") - ax.set_ylabel("RX Amplitude") - - ax = plt.subplot(2,3,3) + ax = plt.subplot(4, 2, i_sub) + i_sub += 1 ccdf_min, ccdf_max = 0, 1 tx_hist, ccdf_edges = np.histogram(np.abs(tx_dpd), - bins=60, - range=(ccdf_min, ccdf_max)) - tx_hist_normalized = tx_hist.astype(float)/np.sum(tx_hist) + bins=60, + range=(ccdf_min, ccdf_max)) + tx_hist_normalized = tx_hist.astype(float) / np.sum(tx_hist) ccdf = 1.0 - np.cumsum(tx_hist_normalized) ax.semilogy(ccdf_edges[:-1], ccdf, label="CCDF") ax.semilogy(ccdf_edges[:-1], @@ -175,23 +226,22 @@ class Model: label="Histogram", drawstyle='steps') ax.legend(loc=4) - ax.set_ylim(1e-5,2) + ax.set_ylim(1e-5, 2) ax.set_title("Complementary Cumulative Distribution Function") ax.set_xlabel("TX Amplitude") ax.set_ylabel("Ratio of Samples larger than x") - ax = plt.subplot(2,3,4) - coefs_history = np.array(self.coefs_history) - for idx, coef_hist in enumerate(coefs_history.T): - ax.plot(coef_hist, - label="Coef {}".format(idx), - linewidth=0.5) + ax = plt.subplot(4, 2, i_sub) + i_sub += 1 + ax.semilogy(np.array(self.mses_am) + 1e-10, label="log(MSE)") + ax.semilogy(np.array(self.errs_am) + 1e-10, label="log(ERR)") ax.legend(loc=4) - ax.set_title("AM/AM Coefficient History") + ax.set_title("MSE History") ax.set_xlabel("Iterations") - ax.set_ylabel("Coefficient Value") + ax.set_ylabel("MSE") - ax = plt.subplot(2,3,5) + ax = plt.subplot(4, 2, i_sub) + i_sub += 1 ax.plot(self.tx_mers, label="TX MER") ax.plot(self.rx_mers, label="RX MER") ax.legend(loc=4) @@ -199,20 +249,83 @@ class Model: ax.set_xlabel("Iterations") ax.set_ylabel("MER") - ax = plt.subplot(2,3,6) - ax.plot(self.mses, label="MSE") - ax.plot(self.errs, label="ERR") + ax = plt.subplot(4, 2, i_sub) + rx_range = np.linspace(0, 1, num=100, dtype=np.complex64) + rx_range_dpd = self.dpd_amplitude(rx_range)[0] + rx_range_dpd_new = self.dpd_amplitude(rx_range, new_coefs_am)[0] + i_sub += 1 + ax.scatter( + np.abs(tx_choice), + np.abs(rx_choice), + s=0.1) + ax.plot(rx_range_dpd / self.coefs_am[0], rx_range, linewidth=0.25, label="current") + ax.plot(rx_range_dpd_new / self.coefs_am[0], rx_range, linewidth=0.25, label="next") + ax.set_ylim(0, 1) + ax.set_xlim(0, 1) + ax.legend() + ax.set_title("Amplifier Characteristic") + ax.set_xlabel("TX Amplitude") + ax.set_ylabel("RX Amplitude") + + ax = plt.subplot(4, 2, i_sub) + i_sub += 1 + coefs_am_history = np.array(self.coefs_am_history) + for idx, coef_hist in enumerate(coefs_am_history.T): + ax.plot(coef_hist, + label="Coef {}".format(idx), + linewidth=0.5) + ax.legend(loc=4) + ax.set_title("AM/AM Coefficient History") + ax.set_xlabel("Iterations") + ax.set_ylabel("Coefficient Value") + + phase_range = np.linspace(0, 1, num=100, dtype=np.complex64) + phase_range_dpd = self.dpd_phase(phase_range)[0] + phase_range_dpd_new = self.dpd_phase(phase_range, + coefs=new_coefs_pm)[0] + ax = plt.subplot(4, 2, i_sub) + i_sub += 1 + ax.scatter( + np.abs(tx_choice), + np.rad2deg(phase_diff_choice), + s=0.1) + ax.plot( + np.abs(phase_range), + np.rad2deg(phase_range_dpd), + linewidth=0.25, + label="current") + ax.plot( + np.abs(phase_range), + np.rad2deg(phase_range_dpd_new), + linewidth=0.25, + label="next") + ax.set_ylim(-60, 60) + ax.set_xlim(0, 1) + ax.legend() + ax.set_title("Amplifier Characteristic") + ax.set_xlabel("TX Amplitude") + ax.set_ylabel("Phase Difference") + + ax = plt.subplot(4, 2, i_sub) + i_sub += 1 + coefs_pm_history = np.array(self.coefs_pm_history) + for idx, coef_phase_hist in enumerate(coefs_pm_history.T): + ax.plot(coef_phase_hist, + label="Coef {}".format(idx), + linewidth=0.5) ax.legend(loc=4) - ax.set_title("MSE History") + ax.set_title("AM/PM Coefficient History") ax.set_xlabel("Iterations") - ax.set_ylabel("MSE") + ax.set_ylabel("Coefficient Value") fig.tight_layout() fig.savefig(fig_path) fig.clf() - self.coefs_am = new_coefs - self.coefs_history.append(self.coefs_am) + self.coefs_am = new_coefs_am + self.coefs_am_history.append(self.coefs_am) + self.coefs_pm = new_coefs_pm + self.coefs_pm_history.append(self.coefs_pm) return self.coefs_am, self.coefs_pm # The MIT License (MIT) |