aboutsummaryrefslogtreecommitdiffstats
path: root/python/dpdce.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/dpdce.py')
-rwxr-xr-xpython/dpdce.py304
1 files changed, 262 insertions, 42 deletions
diff --git a/python/dpdce.py b/python/dpdce.py
index 838d265..cf98aa0 100755
--- a/python/dpdce.py
+++ b/python/dpdce.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# DPD Computation Engine standalone main file.
@@ -43,7 +43,8 @@ rc_port = config.getint('rc_port')
samplerate = config.getint('samplerate')
samps = config.getint('samps')
coef_file = config['coef_file']
-log_folder = config['log_folder']
+logs_directory = config['logs_directory']
+plot_directory = config['plot_directory']
import logging
import datetime
@@ -52,7 +53,7 @@ save_logs = False
# Simple usage scenarios don't need to clutter /tmp
if save_logs:
- dt = datetime.datetime.now().isoformat()
+ dt = datetime.datetime.utcnow().isoformat()
logging_path = '/tmp/dpd_{}'.format(dt).replace('.', '_').replace(':', '-')
print("Logs and plots written to {}".format(logging_path))
os.makedirs(logging_path)
@@ -71,7 +72,7 @@ if save_logs:
# add the handler to the root logger
logging.getLogger('').addHandler(console)
else:
- dt = datetime.datetime.now().isoformat()
+ dt = datetime.datetime.utcnow().isoformat()
logging.basicConfig(format='%(asctime)s - %(module)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.INFO)
@@ -79,10 +80,14 @@ else:
logging.info("DPDCE starting up");
+import time
import socket
from lib import yamlrpc
import numpy as np
import traceback
+import os.path
+import glob
+import re
from threading import Thread, Lock
from queue import Queue
from dpd.Model import Poly
@@ -96,13 +101,15 @@ from dpd.GlobalConfig import GlobalConfig
from dpd.MER import MER
from dpd.Measure_Shoulders import Measure_Shoulders
-c = GlobalConfig(samplerate, logging_path)
+plot_path = os.path.realpath(plot_directory)
+coef_file = os.path.realpath(config['coef_file'])
+
+c = GlobalConfig(samplerate, plot_path)
symbol_align = Symbol_align(c)
mer = MER(c)
meas_shoulders = Measure_Shoulders(c)
meas = Measure(c, samplerate, dpd_port, samps)
-extStat = ExtractStatistic(c)
-adapt = Adapt(rc_port, coef_file, logging_path)
+adapt = Adapt(rc_port, coef_file, plot_path)
model = Poly(c)
@@ -128,54 +135,271 @@ if cli_args.reset:
cmd_socket = yamlrpc.Socket(bind_port=control_port)
# The following is accessed by both threads and need to be locked
-settings = {
- 'rx_gain': rx_gain,
- 'tx_gain': tx_gain,
- 'digital_gain': digital_gain,
- 'dpddata': dpddata,
+internal_data = {
+ 'n_runs': 0,
}
results = {
+ 'adapt_dumps': [],
+ 'statplot': None,
+ 'modelplot': None,
+ 'modeldata': dpddata_to_str(dpddata),
'tx_median': 0,
'rx_median': 0,
'state': 'Idle',
+ 'stateprogress': 0, # in percent
'summary': ['DPD has not been calibrated yet'],
}
lock = Lock()
command_queue = Queue(maxsize=1)
+# Fill list of adapt dumps so that user can choose a previous
+# setting across restarts.
+results['adapt_dumps'].append("defaults")
+
+adapt_dump_files = glob.glob(os.path.join(plot_path, "adapt_*.pkl"))
+re_adaptfile = re.compile(r"adapt_(.*)\.pkl")
+for f in adapt_dump_files:
+ match = re_adaptfile.search(f)
+ if match:
+ results['adapt_dumps'].append(match.group(1))
+
# Automatic Gain Control for the RX gain
agc = Agc(meas, adapt, c)
+def clear_pngs(results):
+ results['statplot'] = None
+ results['modelplot'] = None
+ pngs = glob.glob(os.path.join(plot_path, "*.png"))
+ for png in pngs:
+ try:
+ os.remove(png)
+ except:
+ results['summary'] += ["failed to delete " + png]
+
def engine_worker():
- try:
- while True:
+ extStat = None
+ while True:
+ try:
cmd = command_queue.get()
if cmd == "quit":
break
elif cmd == "calibrate":
with lock:
- results['state'] = 'rx gain calibration'
+ results['state'] = 'RX Gain Calibration'
+ results['stateprogress'] = 0
+ clear_pngs(results)
- agc_success, agc_summary = agc.run()
- summary = ["First calibration run:"] + agc_summary.split("\n")
- if agc_success:
+ summary = []
+ N_ITER = 3
+ for i in range(N_ITER):
agc_success, agc_summary = agc.run()
- summary += ["Second calibration run: "] + agc_summary.split("\n")
+ summary += ["Iteration {}:".format(i)] + agc_summary.split("\n")
+
+ with lock:
+ results['stateprogress'] = int((i + 1) * 100/N_ITER)
+ results['summary'] = ["Calibration ongoing:"] + summary
+
+ if not agc_success:
+ break
txframe_aligned, tx_ts, rxframe_aligned, rx_ts, rx_median, tx_median = meas.get_samples()
with lock:
- settings['rx_gain'] = adapt.get_rxgain()
- settings['digital_gain'] = adapt.get_digital_gain()
results['tx_median'] = float(tx_median)
results['rx_median'] = float(rx_median)
results['state'] = 'Idle'
- results['summary'] = ["Calibration was done:"] + summary
+ results['stateprogress'] = 100
+ results['summary'] = summary + ["Calibration done"]
+ elif cmd == "reset":
+ model.reset_coefs()
+ with lock:
+ internal_data['n_runs'] = 0
+ results['state'] = 'Idle'
+ results['stateprogress'] = 0
+ results['summary'] = ["Reset"]
+ results['modeldata'] = dpddata_to_str(model.get_dpd_data())
+ clear_pngs(results)
+ extStat = None
+ elif cmd == "trigger_run":
+ with lock:
+ results['state'] = 'Capture + Model'
+ results['stateprogress'] = 0
+ n_runs = internal_data['n_runs']
+
+ while True:
+ # Get Samples and check gain
+ txframe_aligned, tx_ts, rxframe_aligned, rx_ts, rx_median, tx_median = meas.get_samples()
+
+ if extStat is None:
+ # At first run, we must decide how to create the bins
+ peak_estimated = tx_median * c.median_to_peak
+ extStat = ExtractStatistic(c, peak_estimated)
+
+ with lock:
+ results['stateprogress'] += 2
+
+ # Extract usable data from measurement
+ tx, rx, phase_diff, n_per_bin = extStat.extract(txframe_aligned, rxframe_aligned)
+
+ utctime = datetime.datetime.utcnow()
+ plot_file = "stats_{}.png".format(utctime.strftime("%s"))
+ extStat.plot(os.path.join(plot_path, plot_file), utctime.strftime("%Y-%m-%dT%H%M%S"))
+ n_meas = Heuristics.get_n_meas(n_runs)
+
+ with lock:
+ results['statplot'] = "dpd/" + plot_file
+ results['stateprogress'] += 2
+ results['summary'] = ["Captured {} samples".format(len(txframe_aligned)),
+ "TX/RX median: {} / {}".format(tx_median, rx_median),
+ extStat.get_bin_info(),
+ "Extracted Statistics: TX median={} RX median={}".format(tx_median, rx_median),
+ "Runs: {}/{}".format(extStat.n_meas, n_meas)]
+ if extStat.n_meas >= n_meas:
+ break
+
+ if any(x is None for x in [tx, rx, phase_diff]):
+ with lock:
+ results['summary'] += ["Error! No data to calculate model"]
+ results['state'] = 'Idle'
+ results['stateprogress'] = 0
+ else:
+ with lock:
+ results['state'] = 'Capture + Model'
+ results['stateprogress'] = 80
+ results['summary'] += ["Training model"]
+
+ model.train(tx, rx, phase_diff, lr=Heuristics.get_learning_rate(n_runs))
+
+ utctime = datetime.datetime.utcnow()
+ model_plot_file = "model_{}.png".format(utctime.strftime("%s"))
+ model.plot(
+ os.path.join(plot_path, model_plot_file),
+ utctime.strftime("%Y-%m-%dT%H%M%S"))
+
+ with lock:
+ results['modelplot'] = "dpd/" + model_plot_file
+ results['state'] = 'Capture + Model'
+ results['stateprogress'] = 85
+ results['summary'] += ["Getting DPD data"]
+
+ dpddata = model.get_dpd_data()
+ with lock:
+ internal_data['dpddata'] = dpddata
+ internal_data['n_runs'] = 0
+
+ results['modeldata'] = dpddata_to_str(dpddata)
+ results['state'] = 'Capture + Model'
+ results['stateprogress'] = 90
+ results['summary'] += ["Reset statistics"]
+
+ extStat = None
+
+ with lock:
+ results['state'] = 'Idle'
+ results['stateprogress'] = 100
+ results['summary'] += ["New DPD coefficients calculated"]
+ elif cmd == "adapt":
+ with lock:
+ dpddata = internal_data['dpddata']
+ results['state'] = 'Update Predistorter'
+ results['stateprogress'] = 50
+ results['summary'] = [""]
+ iteration = internal_data['n_runs']
+ internal_data['n_runs'] += 1
+
+ adapt.set_predistorter(dpddata)
- finally:
- with lock:
- results['state'] = 'terminated'
+ time.sleep(2)
+
+ txframe_aligned, tx_ts, rxframe_aligned, rx_ts, rx_median, tx_median = meas.get_samples()
+
+ # Store all settings for pre-distortion, tx and rx
+ utctime = datetime.datetime.utcnow()
+ dump_file = "adapt_{}.pkl".format(utctime.strftime("%s"))
+ adapt.dump(os.path.join(plot_path, dump_file))
+
+ with lock:
+ results['adapt_dumps'].append(utctime.strftime("%s"))
+
+ # Collect logging data
+ off = symbol_align.calc_offset(txframe_aligned)
+ tx_mer = mer.calc_mer(txframe_aligned[off:off + c.T_U], debug_name='TX')
+ rx_mer = mer.calc_mer(rxframe_aligned[off:off + c.T_U], debug_name='RX')
+ mse = np.mean(np.abs((txframe_aligned - rxframe_aligned) ** 2))
+ tx_gain = adapt.get_txgain()
+ rx_gain = adapt.get_rxgain()
+ digital_gain = adapt.get_digital_gain()
+ rx_shoulder_tuple = meas_shoulders.average_shoulders(rxframe_aligned)
+ tx_shoulder_tuple = meas_shoulders.average_shoulders(txframe_aligned)
+
+ lr = Heuristics.get_learning_rate(iteration)
+
+ summary = [f"Set predistorter:",
+ f"Signal measurements after iteration {iteration} with learning rate {lr}",
+ f"TX MER {tx_mer:.2}, RX MER {rx_mer:.2}",
+ f"Mean-square error: {mse:.3}"]
+ if tx_shoulder_tuple is not None:
+ summary.append("Shoulders: TX {!r}, RX {!r}".format(tx_shoulder_tuple, rx_shoulder_tuple))
+ summary.append(f"Running with digital gain {digital_gain}, TX gain {tx_gain} and RX gain {rx_gain}")
+
+ with lock:
+ results['state'] = 'Update Predistorter'
+ results['stateprogress'] = 100
+ results['summary'] = ["Signal measurements after predistortion update"] + summary
+ elif cmd.startswith("restore_dump-"):
+ _, _, dump_id = cmd.partition("-")
+ if dump_id == "defaults":
+ model.reset_coefs()
+ dpddata = model.get_dpd_data()
+ adapt.set_predistorter(dpddata)
+
+ tx_gain = adapt.get_txgain()
+ rx_gain = adapt.get_rxgain()
+ digital_gain = adapt.get_digital_gain()
+ with lock:
+ results['state'] = 'Idle'
+ results['stateprogress'] = 100
+ results['summary'] = [f"Restored DPD defaults",
+ f"Running with digital gain {digital_gain}, TX gain {tx_gain} and RX gain {rx_gain}"]
+ results['modeldata'] = dpddata_to_str(dpddata)
+ else:
+ dump_file = os.path.join(plot_path, f"adapt_{dump_id}.pkl")
+ try:
+ d = adapt.restore(dump_file)
+ logging.info(f"Restore: {d}")
+ model.set_dpd_data(d['dpddata'])
+ with lock:
+ results['state'] = 'Idle'
+ results['stateprogress'] = 100
+ results['summary'] = [f"Restored DPD settings from dumpfile {dump_id}",
+ f"Running with digital gain {d['digital_gain']}, TX gain {d['txgain']} and RX gain {d['rxgain']}"]
+ results['modeldata'] = dpddata_to_str(d["dpddata"])
+ except:
+ e = traceback.format_exc()
+ with lock:
+ results['state'] = 'Idle'
+ results['stateprogress'] = 100
+ results['summary'] = [f"Failed to restore DPD settings from dumpfile {dump_id}",
+ f"Error: {e}"]
+ except:
+ e = traceback.format_exc()
+ logging.error(e)
+ with lock:
+ results['summary'] = [f"Exception:"] + e.split("\n")
+ results['state'] = 'Autorestart pending'
+ results['stateprogress'] = 0
+
+ for i in range(5):
+ time.sleep(2)
+ with lock:
+ results['stateprogress'] += 20
+ time.sleep(2)
+ with lock:
+ dt = datetime.datetime.utcnow().isoformat()
+ results['summary'] = [f"DPD engine auto-restarted at {dt} UTC", f"After exception {e}"]
+ results['state'] = 'Idle'
+ results['stateprogress'] = 0
engine = Thread(target=engine_worker)
@@ -186,7 +410,7 @@ try:
try:
addr, msg_id, method, params = cmd_socket.receive_request()
except ValueError as e:
- logging.warning('YAML-RPC request error: {}'.format(e))
+ logging.warning('RPC request error: {}'.format(e))
continue
except TimeoutError:
continue
@@ -194,28 +418,24 @@ try:
logging.info('Caught KeyboardInterrupt')
break
except:
- logging.error('YAML-RPC unknown error')
+ logging.error('RPC unknown error')
break
- if method == 'trigger_run':
- logging.info('YAML-RPC request : {}'.format(method))
- command_queue.put('trigger_run')
- elif method == 'reset':
- logging.info('YAML-RPC request : {}'.format(method))
- command_queue.put('reset')
- elif method == 'set_setting':
- logging.info('YAML-RPC request : {} -> {}'.format(method, params))
- # params == {'setting': ..., 'value': ...}
- pass
- elif method == 'get_settings':
- with lock:
- cmd_socket.send_success_response(addr, msg_id, settings)
+ if any(method == m for m in ['trigger_run', 'reset', 'adapt']):
+ logging.info('Received RPC request : {}'.format(method))
+ command_queue.put(method)
+ cmd_socket.send_success_response(addr, msg_id, None)
+ elif method == 'restore_dump':
+ logging.info('Received RPC request : restore_dump({})'.format(params['dump_id']))
+ command_queue.put(f"restore_dump-{params['dump_id']}")
+ cmd_socket.send_success_response(addr, msg_id, None)
elif method == 'get_results':
with lock:
cmd_socket.send_success_response(addr, msg_id, results)
elif method == 'calibrate':
- logging.info('YAML-RPC request : {}'.format(method))
+ logging.info('Received RPC request : {}'.format(method))
command_queue.put('calibrate')
+ cmd_socket.send_success_response(addr, msg_id, None)
else:
cmd_socket.send_error_response(addr, msg_id, "request not understood")
finally:
@@ -346,7 +566,7 @@ while i < num_iter:
# The MIT License (MIT)
#
# Copyright (c) 2017 Andreas Steger
-# Copyright (c) 2018 Matthias P. Braendli
+# Copyright (c) 2019 Matthias P. Braendli
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal