diff options
Diffstat (limited to 'python/dpdce.py')
-rwxr-xr-x | python/dpdce.py | 304 |
1 files changed, 262 insertions, 42 deletions
diff --git a/python/dpdce.py b/python/dpdce.py index 838d265..cf98aa0 100755 --- a/python/dpdce.py +++ b/python/dpdce.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # DPD Computation Engine standalone main file. @@ -43,7 +43,8 @@ rc_port = config.getint('rc_port') samplerate = config.getint('samplerate') samps = config.getint('samps') coef_file = config['coef_file'] -log_folder = config['log_folder'] +logs_directory = config['logs_directory'] +plot_directory = config['plot_directory'] import logging import datetime @@ -52,7 +53,7 @@ save_logs = False # Simple usage scenarios don't need to clutter /tmp if save_logs: - dt = datetime.datetime.now().isoformat() + dt = datetime.datetime.utcnow().isoformat() logging_path = '/tmp/dpd_{}'.format(dt).replace('.', '_').replace(':', '-') print("Logs and plots written to {}".format(logging_path)) os.makedirs(logging_path) @@ -71,7 +72,7 @@ if save_logs: # add the handler to the root logger logging.getLogger('').addHandler(console) else: - dt = datetime.datetime.now().isoformat() + dt = datetime.datetime.utcnow().isoformat() logging.basicConfig(format='%(asctime)s - %(module)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO) @@ -79,10 +80,14 @@ else: logging.info("DPDCE starting up"); +import time import socket from lib import yamlrpc import numpy as np import traceback +import os.path +import glob +import re from threading import Thread, Lock from queue import Queue from dpd.Model import Poly @@ -96,13 +101,15 @@ from dpd.GlobalConfig import GlobalConfig from dpd.MER import MER from dpd.Measure_Shoulders import Measure_Shoulders -c = GlobalConfig(samplerate, logging_path) +plot_path = os.path.realpath(plot_directory) +coef_file = os.path.realpath(config['coef_file']) + +c = GlobalConfig(samplerate, plot_path) symbol_align = Symbol_align(c) mer = MER(c) meas_shoulders = Measure_Shoulders(c) meas = Measure(c, samplerate, dpd_port, samps) -extStat = ExtractStatistic(c) -adapt = Adapt(rc_port, coef_file, logging_path) +adapt = Adapt(rc_port, coef_file, plot_path) model = Poly(c) @@ -128,54 +135,271 @@ if cli_args.reset: cmd_socket = yamlrpc.Socket(bind_port=control_port) # The following is accessed by both threads and need to be locked -settings = { - 'rx_gain': rx_gain, - 'tx_gain': tx_gain, - 'digital_gain': digital_gain, - 'dpddata': dpddata, +internal_data = { + 'n_runs': 0, } results = { + 'adapt_dumps': [], + 'statplot': None, + 'modelplot': None, + 'modeldata': dpddata_to_str(dpddata), 'tx_median': 0, 'rx_median': 0, 'state': 'Idle', + 'stateprogress': 0, # in percent 'summary': ['DPD has not been calibrated yet'], } lock = Lock() command_queue = Queue(maxsize=1) +# Fill list of adapt dumps so that user can choose a previous +# setting across restarts. +results['adapt_dumps'].append("defaults") + +adapt_dump_files = glob.glob(os.path.join(plot_path, "adapt_*.pkl")) +re_adaptfile = re.compile(r"adapt_(.*)\.pkl") +for f in adapt_dump_files: + match = re_adaptfile.search(f) + if match: + results['adapt_dumps'].append(match.group(1)) + # Automatic Gain Control for the RX gain agc = Agc(meas, adapt, c) +def clear_pngs(results): + results['statplot'] = None + results['modelplot'] = None + pngs = glob.glob(os.path.join(plot_path, "*.png")) + for png in pngs: + try: + os.remove(png) + except: + results['summary'] += ["failed to delete " + png] + def engine_worker(): - try: - while True: + extStat = None + while True: + try: cmd = command_queue.get() if cmd == "quit": break elif cmd == "calibrate": with lock: - results['state'] = 'rx gain calibration' + results['state'] = 'RX Gain Calibration' + results['stateprogress'] = 0 + clear_pngs(results) - agc_success, agc_summary = agc.run() - summary = ["First calibration run:"] + agc_summary.split("\n") - if agc_success: + summary = [] + N_ITER = 3 + for i in range(N_ITER): agc_success, agc_summary = agc.run() - summary += ["Second calibration run: "] + agc_summary.split("\n") + summary += ["Iteration {}:".format(i)] + agc_summary.split("\n") + + with lock: + results['stateprogress'] = int((i + 1) * 100/N_ITER) + results['summary'] = ["Calibration ongoing:"] + summary + + if not agc_success: + break txframe_aligned, tx_ts, rxframe_aligned, rx_ts, rx_median, tx_median = meas.get_samples() with lock: - settings['rx_gain'] = adapt.get_rxgain() - settings['digital_gain'] = adapt.get_digital_gain() results['tx_median'] = float(tx_median) results['rx_median'] = float(rx_median) results['state'] = 'Idle' - results['summary'] = ["Calibration was done:"] + summary + results['stateprogress'] = 100 + results['summary'] = summary + ["Calibration done"] + elif cmd == "reset": + model.reset_coefs() + with lock: + internal_data['n_runs'] = 0 + results['state'] = 'Idle' + results['stateprogress'] = 0 + results['summary'] = ["Reset"] + results['modeldata'] = dpddata_to_str(model.get_dpd_data()) + clear_pngs(results) + extStat = None + elif cmd == "trigger_run": + with lock: + results['state'] = 'Capture + Model' + results['stateprogress'] = 0 + n_runs = internal_data['n_runs'] + + while True: + # Get Samples and check gain + txframe_aligned, tx_ts, rxframe_aligned, rx_ts, rx_median, tx_median = meas.get_samples() + + if extStat is None: + # At first run, we must decide how to create the bins + peak_estimated = tx_median * c.median_to_peak + extStat = ExtractStatistic(c, peak_estimated) + + with lock: + results['stateprogress'] += 2 + + # Extract usable data from measurement + tx, rx, phase_diff, n_per_bin = extStat.extract(txframe_aligned, rxframe_aligned) + + utctime = datetime.datetime.utcnow() + plot_file = "stats_{}.png".format(utctime.strftime("%s")) + extStat.plot(os.path.join(plot_path, plot_file), utctime.strftime("%Y-%m-%dT%H%M%S")) + n_meas = Heuristics.get_n_meas(n_runs) + + with lock: + results['statplot'] = "dpd/" + plot_file + results['stateprogress'] += 2 + results['summary'] = ["Captured {} samples".format(len(txframe_aligned)), + "TX/RX median: {} / {}".format(tx_median, rx_median), + extStat.get_bin_info(), + "Extracted Statistics: TX median={} RX median={}".format(tx_median, rx_median), + "Runs: {}/{}".format(extStat.n_meas, n_meas)] + if extStat.n_meas >= n_meas: + break + + if any(x is None for x in [tx, rx, phase_diff]): + with lock: + results['summary'] += ["Error! No data to calculate model"] + results['state'] = 'Idle' + results['stateprogress'] = 0 + else: + with lock: + results['state'] = 'Capture + Model' + results['stateprogress'] = 80 + results['summary'] += ["Training model"] + + model.train(tx, rx, phase_diff, lr=Heuristics.get_learning_rate(n_runs)) + + utctime = datetime.datetime.utcnow() + model_plot_file = "model_{}.png".format(utctime.strftime("%s")) + model.plot( + os.path.join(plot_path, model_plot_file), + utctime.strftime("%Y-%m-%dT%H%M%S")) + + with lock: + results['modelplot'] = "dpd/" + model_plot_file + results['state'] = 'Capture + Model' + results['stateprogress'] = 85 + results['summary'] += ["Getting DPD data"] + + dpddata = model.get_dpd_data() + with lock: + internal_data['dpddata'] = dpddata + internal_data['n_runs'] = 0 + + results['modeldata'] = dpddata_to_str(dpddata) + results['state'] = 'Capture + Model' + results['stateprogress'] = 90 + results['summary'] += ["Reset statistics"] + + extStat = None + + with lock: + results['state'] = 'Idle' + results['stateprogress'] = 100 + results['summary'] += ["New DPD coefficients calculated"] + elif cmd == "adapt": + with lock: + dpddata = internal_data['dpddata'] + results['state'] = 'Update Predistorter' + results['stateprogress'] = 50 + results['summary'] = [""] + iteration = internal_data['n_runs'] + internal_data['n_runs'] += 1 + + adapt.set_predistorter(dpddata) - finally: - with lock: - results['state'] = 'terminated' + time.sleep(2) + + txframe_aligned, tx_ts, rxframe_aligned, rx_ts, rx_median, tx_median = meas.get_samples() + + # Store all settings for pre-distortion, tx and rx + utctime = datetime.datetime.utcnow() + dump_file = "adapt_{}.pkl".format(utctime.strftime("%s")) + adapt.dump(os.path.join(plot_path, dump_file)) + + with lock: + results['adapt_dumps'].append(utctime.strftime("%s")) + + # Collect logging data + off = symbol_align.calc_offset(txframe_aligned) + tx_mer = mer.calc_mer(txframe_aligned[off:off + c.T_U], debug_name='TX') + rx_mer = mer.calc_mer(rxframe_aligned[off:off + c.T_U], debug_name='RX') + mse = np.mean(np.abs((txframe_aligned - rxframe_aligned) ** 2)) + tx_gain = adapt.get_txgain() + rx_gain = adapt.get_rxgain() + digital_gain = adapt.get_digital_gain() + rx_shoulder_tuple = meas_shoulders.average_shoulders(rxframe_aligned) + tx_shoulder_tuple = meas_shoulders.average_shoulders(txframe_aligned) + + lr = Heuristics.get_learning_rate(iteration) + + summary = [f"Set predistorter:", + f"Signal measurements after iteration {iteration} with learning rate {lr}", + f"TX MER {tx_mer:.2}, RX MER {rx_mer:.2}", + f"Mean-square error: {mse:.3}"] + if tx_shoulder_tuple is not None: + summary.append("Shoulders: TX {!r}, RX {!r}".format(tx_shoulder_tuple, rx_shoulder_tuple)) + summary.append(f"Running with digital gain {digital_gain}, TX gain {tx_gain} and RX gain {rx_gain}") + + with lock: + results['state'] = 'Update Predistorter' + results['stateprogress'] = 100 + results['summary'] = ["Signal measurements after predistortion update"] + summary + elif cmd.startswith("restore_dump-"): + _, _, dump_id = cmd.partition("-") + if dump_id == "defaults": + model.reset_coefs() + dpddata = model.get_dpd_data() + adapt.set_predistorter(dpddata) + + tx_gain = adapt.get_txgain() + rx_gain = adapt.get_rxgain() + digital_gain = adapt.get_digital_gain() + with lock: + results['state'] = 'Idle' + results['stateprogress'] = 100 + results['summary'] = [f"Restored DPD defaults", + f"Running with digital gain {digital_gain}, TX gain {tx_gain} and RX gain {rx_gain}"] + results['modeldata'] = dpddata_to_str(dpddata) + else: + dump_file = os.path.join(plot_path, f"adapt_{dump_id}.pkl") + try: + d = adapt.restore(dump_file) + logging.info(f"Restore: {d}") + model.set_dpd_data(d['dpddata']) + with lock: + results['state'] = 'Idle' + results['stateprogress'] = 100 + results['summary'] = [f"Restored DPD settings from dumpfile {dump_id}", + f"Running with digital gain {d['digital_gain']}, TX gain {d['txgain']} and RX gain {d['rxgain']}"] + results['modeldata'] = dpddata_to_str(d["dpddata"]) + except: + e = traceback.format_exc() + with lock: + results['state'] = 'Idle' + results['stateprogress'] = 100 + results['summary'] = [f"Failed to restore DPD settings from dumpfile {dump_id}", + f"Error: {e}"] + except: + e = traceback.format_exc() + logging.error(e) + with lock: + results['summary'] = [f"Exception:"] + e.split("\n") + results['state'] = 'Autorestart pending' + results['stateprogress'] = 0 + + for i in range(5): + time.sleep(2) + with lock: + results['stateprogress'] += 20 + time.sleep(2) + with lock: + dt = datetime.datetime.utcnow().isoformat() + results['summary'] = [f"DPD engine auto-restarted at {dt} UTC", f"After exception {e}"] + results['state'] = 'Idle' + results['stateprogress'] = 0 engine = Thread(target=engine_worker) @@ -186,7 +410,7 @@ try: try: addr, msg_id, method, params = cmd_socket.receive_request() except ValueError as e: - logging.warning('YAML-RPC request error: {}'.format(e)) + logging.warning('RPC request error: {}'.format(e)) continue except TimeoutError: continue @@ -194,28 +418,24 @@ try: logging.info('Caught KeyboardInterrupt') break except: - logging.error('YAML-RPC unknown error') + logging.error('RPC unknown error') break - if method == 'trigger_run': - logging.info('YAML-RPC request : {}'.format(method)) - command_queue.put('trigger_run') - elif method == 'reset': - logging.info('YAML-RPC request : {}'.format(method)) - command_queue.put('reset') - elif method == 'set_setting': - logging.info('YAML-RPC request : {} -> {}'.format(method, params)) - # params == {'setting': ..., 'value': ...} - pass - elif method == 'get_settings': - with lock: - cmd_socket.send_success_response(addr, msg_id, settings) + if any(method == m for m in ['trigger_run', 'reset', 'adapt']): + logging.info('Received RPC request : {}'.format(method)) + command_queue.put(method) + cmd_socket.send_success_response(addr, msg_id, None) + elif method == 'restore_dump': + logging.info('Received RPC request : restore_dump({})'.format(params['dump_id'])) + command_queue.put(f"restore_dump-{params['dump_id']}") + cmd_socket.send_success_response(addr, msg_id, None) elif method == 'get_results': with lock: cmd_socket.send_success_response(addr, msg_id, results) elif method == 'calibrate': - logging.info('YAML-RPC request : {}'.format(method)) + logging.info('Received RPC request : {}'.format(method)) command_queue.put('calibrate') + cmd_socket.send_success_response(addr, msg_id, None) else: cmd_socket.send_error_response(addr, msg_id, "request not understood") finally: @@ -346,7 +566,7 @@ while i < num_iter: # The MIT License (MIT) # # Copyright (c) 2017 Andreas Steger -# Copyright (c) 2018 Matthias P. Braendli +# Copyright (c) 2019 Matthias P. Braendli # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal |