From 5506c7bc287e23836b953d732829b48c997b878a Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 22 Dec 2018 16:36:18 +0100 Subject: GUI: Update Adapt to use zmqrc lib --- python/dpd/Adapt.py | 278 ++++++++++++++++++---------------------------------- python/dpdce.py | 4 +- python/lib/zmqrc.py | 9 +- 3 files changed, 105 insertions(+), 186 deletions(-) diff --git a/python/dpd/Adapt.py b/python/dpd/Adapt.py index 8108375..9c2c2b6 100644 --- a/python/dpd/Adapt.py +++ b/python/dpd/Adapt.py @@ -9,40 +9,38 @@ This module is used to change settings of ODR-DabMod using the ZMQ remote control socket. """ -import zmq import logging import numpy as np -import os -import datetime +import os.path import pickle +from lib import zmqrc +from typing import List LUT_LEN = 32 FORMAT_POLY = 1 FORMAT_LUT = 2 -def _write_poly_coef_file(coefs_am, coefs_pm, path): - assert (len(coefs_am) == len(coefs_pm)) +def _write_poly_coef_file(coefs_am: List[float], coefs_pm: List[float], path: str) -> None: + assert len(coefs_am) == len(coefs_pm) - f = open(path, 'w') - f.write("{}\n{}\n".format(FORMAT_POLY, len(coefs_am))) - for coef in coefs_am: - f.write("{}\n".format(coef)) - for coef in coefs_pm: - f.write("{}\n".format(coef)) - f.close() + with open(path, 'w') as f: + f.write("{}\n{}\n".format(FORMAT_POLY, len(coefs_am))) + for coef in coefs_am: + f.write("{}\n".format(coef)) + for coef in coefs_pm: + f.write("{}\n".format(coef)) -def _write_lut_file(scalefactor, lut, path): - assert (len(lut) == LUT_LEN) +def _write_lut_file(scalefactor: float, lut: List[complex], path: str) -> None: + assert len(lut) == LUT_LEN - f = open(path, 'w') - f.write("{}\n{}\n".format(FORMAT_LUT, scalefactor)) - for coef in lut: - f.write("{}\n{}\n".format(coef.real, coef.imag)) - f.close() + with open(path, 'w') as f: + f.write("{}\n{}\n".format(FORMAT_LUT, scalefactor)) + for coef in lut: + f.write("{}\n{}\n".format(coef.real, coef.imag)) -def dpddata_to_str(dpddata): +def dpddata_to_str(dpddata) -> str: if dpddata[0] == "poly": coefs_am = dpddata[1] coefs_pm = dpddata[2] @@ -57,215 +55,133 @@ def dpddata_to_str(dpddata): raise ValueError("Unknown dpddata type {}".format(dpddata[0])) class Adapt: - """Uses the ZMQ remote control to change parameters of the DabMod + """Uses the ZMQ remote control to change parameters of the DabMod """ - Parameters - ---------- - port : int - Port at which the ODR-DabMod is listening to connect the - ZMQ remote control. - """ - - def __init__(self, port, coef_path, plot_location): + def __init__(self, port: int, coef_path: str, plot_location: str): logging.debug("Instantiate Adapt object") - self.port = port - self.coef_path = coef_path - self.plot_location = plot_location - self.host = "localhost" - self._context = zmq.Context() - - def _connect(self): - """Establish the connection to ODR-DabMod using - a ZMQ socket that is in request mode (Client). - Returns a socket""" - sock = self._context.socket(zmq.REQ) - poller = zmq.Poller() - poller.register(sock, zmq.POLLIN) - - sock.connect("tcp://%s:%d" % (self.host, self.port)) - - sock.send(b"ping") - - socks = dict(poller.poll(1000)) - if socks: - if socks.get(sock) == zmq.POLLIN: - data = [el.decode() for el in sock.recv_multipart()] - - if data != ['ok']: - raise RuntimeError( - "Invalid ZMQ RC answer to 'ping' at %s %d: %s" % - (self.host, self.port, data)) - else: - sock.close(linger=10) - raise RuntimeError( - "ZMQ RC does not respond to 'ping' at %s %d" % - (self.host, self.port)) - - return sock - - def send_receive(self, message): - """Send a message to ODR-DabMod. It always - returns the answer ODR-DabMod sends back. - - An example message could be - "get sdr txgain" or "set sdr txgain 50" - - Parameter - --------- - message : str - The message string that will be sent to the receiver. - """ - sock = self._connect() - logging.debug("Send message: %s" % message) - msg_parts = message.split(" ") - for i, part in enumerate(msg_parts): - if i == len(msg_parts) - 1: - f = 0 - else: - f = zmq.SNDMORE - - sock.send(part.encode(), flags=f) - - data = [el.decode() for el in sock.recv_multipart()] - logging.debug("Received message: %s" % message) - return data - - def set_txgain(self, gain): - """Set a new txgain for the ODR-DabMod. + self._port = port + self._coef_path = coef_path + self._plot_location = plot_location + self._host = "localhost" + self._mod_rc = zmqrc.ModRemoteControl(self._host, self._port) - Parameters - ---------- - gain : int - new TX gain, in the same format as ODR-DabMod's config file - """ + def set_txgain(self, gain : float) -> None: # TODO this is specific to the B200 if gain < 0 or gain > 89: raise ValueError("Gain has to be in [0,89]") - return self.send_receive("set sdr txgain %.4f" % float(gain)) - - def get_txgain(self): - """Get the txgain value in dB for the ODR-DabMod.""" - # TODO handle failure - return float(self.send_receive("get sdr txgain")[0]) + self._mod_rc.set_param_value("sdr", "txgain", "%.4f" % float(gain)) - def set_rxgain(self, gain): - """Set a new rxgain for the ODR-DabMod. + def get_txgain(self) -> float: + """Get the txgain value in dB, or -1 in case of error""" + try: + return float(self._mod_rc.get_param_value("sdr", "txgain")) + except ValueError as e: + logging.warning(f"Adapt: get_txgain error: {e}") + return -1.0 - Parameters - ---------- - gain : int - new RX gain, in the same format as ODR-DabMod's config file - """ + def set_rxgain(self, gain: float) -> None: # TODO this is specific to the B200 if gain < 0 or gain > 89: raise ValueError("Gain has to be in [0,89]") - return self.send_receive("set sdr rxgain %.4f" % float(gain)) - - def get_rxgain(self): - """Get the rxgain value in dB for the ODR-DabMod.""" - # TODO handle failure - return float(self.send_receive("get sdr rxgain")[0]) - - def set_digital_gain(self, gain): - """Set a new rxgain for the ODR-DabMod. - - Parameters - ---------- - gain : int - new RX gain, in the same format as ODR-DabMod's config file - """ - msg = "set gain digital %.5f" % gain - return self.send_receive(msg) - - def get_digital_gain(self): - """Get the rxgain value in dB for the ODR-DabMod.""" - # TODO handle failure - return float(self.send_receive("get gain digital")[0]) + self._mod_rc.set_param_value("sdr", "rxgain", "%.4f" % float(gain)) + + def get_rxgain(self) -> float: + """Get the rxgain value in dB, or -1 in case of error""" + try: + return float(self._mod_rc.get_param_value("sdr", "rxgain")) + except ValueError as e: + logging.warning(f"Adapt: get_rxgain error: {e}") + return -1.0 + + def set_digital_gain(self, gain: float) -> None: + self._mod_rc.set_param_value("gain", "digital", "%.5f" % float(gain)) + + def get_digital_gain(self) -> float: + """Get the digital gain value in linear scale, or -1 in case + of error""" + try: + return float(self._mod_rc.get_param_value("gain", "digital")) + except ValueError as e: + logging.warning(f"Adapt: get_digital_gain error: {e}") + return -1.0 def get_predistorter(self): """Load the coefficients from the file in the format given in the README, return ("poly", [AM coef], [PM coef]) or ("lut", scalefactor, [LUT entries]) """ - f = open(self.coef_path, 'r') - lines = f.readlines() - predistorter_format = int(lines[0]) - if predistorter_format == FORMAT_POLY: - coefs_am_out = [] - coefs_pm_out = [] - n_coefs = int(lines[1]) - coefs = [float(l) for l in lines[2:]] - i = 0 - for c in coefs: - if i < n_coefs: - coefs_am_out.append(c) - elif i < 2 * n_coefs: - coefs_pm_out.append(c) - else: - raise ValueError( - 'Incorrect coef file format: too many' - ' coefficients in {}, should be {}, coefs are {}' - .format(self.coef_path, n_coefs, coefs)) - i += 1 - f.close() - return 'poly', coefs_am_out, coefs_pm_out - elif predistorter_format == FORMAT_LUT: - scalefactor = int(lines[1]) - coefs = np.array([float(l) for l in lines[2:]], dtype=np.float32) - coefs = coefs.reshape((-1, 2)) - lut = coefs[..., 0] + 1j * coefs[..., 1] - if len(lut) != LUT_LEN: - raise ValueError("Incorrect number of LUT entries ({} expected {})".format(len(lut), LUT_LEN)) - return 'lut', scalefactor, lut - else: - raise ValueError("Unknown predistorter format {}".format(predistorter_format)) + with open(self._coef_path, 'r') as f: + lines = f.readlines() + predistorter_format = int(lines[0]) + if predistorter_format == FORMAT_POLY: + coefs_am_out = [] + coefs_pm_out = [] + n_coefs = int(lines[1]) + coefs = [float(l) for l in lines[2:]] + for i, c in enumerate(coefs): + if i < n_coefs: + coefs_am_out.append(c) + elif i < 2 * n_coefs: + coefs_pm_out.append(c) + else: + raise ValueError( + 'Incorrect coef file format: too many' + ' coefficients in {}, should be {}, coefs are {}' + .format(self._coef_path, n_coefs, coefs)) + return 'poly', coefs_am_out, coefs_pm_out + elif predistorter_format == FORMAT_LUT: + scalefactor = int(lines[1]) + coefs = np.array([float(l) for l in lines[2:]], dtype=np.float32) + coefs = coefs.reshape((-1, 2)) + lut = coefs[..., 0] + 1j * coefs[..., 1] + if len(lut) != LUT_LEN: + raise ValueError("Incorrect number of LUT entries ({} expected {})".format(len(lut), LUT_LEN)) + return 'lut', scalefactor, lut + else: + raise ValueError("Unknown predistorter format {}".format(predistorter_format)) - def set_predistorter(self, dpddata): + def set_predistorter(self, dpddata) -> None: """Update the predistorter data in the modulator. Takes the same tuple format as argument than the one returned get_predistorter()""" if dpddata[0] == "poly": coefs_am = dpddata[1] coefs_pm = dpddata[2] - _write_poly_coef_file(coefs_am, coefs_pm, self.coef_path) + _write_poly_coef_file(coefs_am, coefs_pm, self._coef_path) elif dpddata[0] == "lut": scalefactor = dpddata[1] lut = dpddata[2] - _write_lut_file(scalefactor, lut, self.coef_path) + _write_lut_file(scalefactor, lut, self._coef_path) else: raise ValueError("Unknown predistorter '{}'".format(dpddata[0])) - return self.send_receive("set memlesspoly coeffile {}".format(self.coef_path)) + self._mod_rc.set_param_value("memlesspoly", "coefffile", self._coef_path) - def dump(self, path=None): + def dump(self, path: str) -> None: """Backup current settings to a file""" - dt = datetime.datetime.now().isoformat() - if path is None: - if self.plot_location is not None: - path = self.plot_location + "/" + dt + "_adapt.pkl" - else: - raise Exception("Cannot dump Adapt without either plot_location or path set") + d = { "txgain": self.get_txgain(), "rxgain": self.get_rxgain(), "digital_gain": self.get_digital_gain(), "predistorter": self.get_predistorter() } + with open(path, "wb") as f: pickle.dump(d, f) - return path - - def load(self, path): + def load(self, path: str) -> None: """Restore settings from a file""" with open(path, "rb") as f: d = pickle.load(f) - self.set_txgain(d["txgain"]) + self.set_txgain(0) self.set_digital_gain(d["digital_gain"]) self.set_rxgain(d["rxgain"]) self.set_predistorter(d["predistorter"]) + self.set_txgain(d["txgain"]) # The MIT License (MIT) # -# Copyright (c) 2017 Andreas Steger, Matthias P. Braendli +# Copyright (c) 2018 Matthias P. Braendli +# Copyright (c) 2017 Andreas Steger # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal diff --git a/python/dpdce.py b/python/dpdce.py index 3b294eb..6c373fd 100755 --- a/python/dpdce.py +++ b/python/dpdce.py @@ -310,7 +310,9 @@ def engine_worker(): txframe_aligned, tx_ts, rxframe_aligned, rx_ts, rx_median, tx_median = meas.get_samples() # Store all settings for pre-distortion, tx and rx - adapt.dump() + utctime = datetime.datetime.utcnow() + dump_file = "adapt_{}.pkl".format(utctime.strftime("%s")) + adapt.dump(os.path.join(plot_path, dump_file)) # Collect logging data off = symbol_align.calc_offset(txframe_aligned) diff --git a/python/lib/zmqrc.py b/python/lib/zmqrc.py index 2d82b3e..423f91d 100644 --- a/python/lib/zmqrc.py +++ b/python/lib/zmqrc.py @@ -22,15 +22,16 @@ # along with ODR-DabMod. If not, see . import zmq import json +from typing import List -class ModRemoteControl(object): +class ModRemoteControl: """Interact with ODR-DabMod using the ZMQ RC""" def __init__(self, mod_host, mod_port=9400): self._host = mod_host self._port = mod_port self._ctx = zmq.Context() - def _read(self, message_parts): + def _read(self, message_parts: List[str]): sock = zmq.Socket(self._ctx, zmq.REQ) sock.setsockopt(zmq.LINGER, 0) sock.connect("tcp://{}:{}".format(self._host, self._port)) @@ -70,14 +71,14 @@ class ModRemoteControl(object): return modules - def get_param_value(self, module, param): + def get_param_value(self, module: str, param: str) -> str: value = self._read(['get', module, param]) if value[0] == 'fail': raise ValueError("Error getting param: {}".format(value[1])) else: return value[0] - def set_param_value(self, module, param, value): + def set_param_value(self, module: str, param: str, value: str) -> None: ret = self._read(['set', module, param, value]) if ret[0] == 'fail': raise ValueError("Error setting param: {}".format(ret[1])) -- cgit v1.2.3