diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-12-22 16:36:18 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-12-22 16:36:18 +0100 | 
| commit | 5506c7bc287e23836b953d732829b48c997b878a (patch) | |
| tree | 1648c3a1fde0519692315c33baefb4105bb8a120 /python | |
| parent | 00b2423a298887fefc77c24e8067e621878cc108 (diff) | |
| download | dabmod-5506c7bc287e23836b953d732829b48c997b878a.tar.gz dabmod-5506c7bc287e23836b953d732829b48c997b878a.tar.bz2 dabmod-5506c7bc287e23836b953d732829b48c997b878a.zip  | |
GUI: Update Adapt to use zmqrc lib
Diffstat (limited to 'python')
| -rw-r--r-- | python/dpd/Adapt.py | 278 | ||||
| -rwxr-xr-x | python/dpdce.py | 4 | ||||
| -rw-r--r-- | python/lib/zmqrc.py | 9 | 
3 files changed, 105 insertions, 186 deletions
diff --git a/python/dpd/Adapt.py b/python/dpd/Adapt.py index 8108375..9c2c2b6 100644 --- a/python/dpd/Adapt.py +++ b/python/dpd/Adapt.py @@ -9,40 +9,38 @@ This module is used to change settings of ODR-DabMod using  the ZMQ remote control socket.  """ -import zmq  import logging  import numpy as np -import os -import datetime +import os.path  import pickle +from lib import zmqrc +from typing import List  LUT_LEN = 32  FORMAT_POLY = 1  FORMAT_LUT = 2 -def _write_poly_coef_file(coefs_am, coefs_pm, path): -    assert (len(coefs_am) == len(coefs_pm)) +def _write_poly_coef_file(coefs_am: List[float], coefs_pm: List[float], path: str) -> None: +    assert len(coefs_am) == len(coefs_pm) -    f = open(path, 'w') -    f.write("{}\n{}\n".format(FORMAT_POLY, len(coefs_am))) -    for coef in coefs_am: -        f.write("{}\n".format(coef)) -    for coef in coefs_pm: -        f.write("{}\n".format(coef)) -    f.close() +    with open(path, 'w') as f: +        f.write("{}\n{}\n".format(FORMAT_POLY, len(coefs_am))) +        for coef in coefs_am: +            f.write("{}\n".format(coef)) +        for coef in coefs_pm: +            f.write("{}\n".format(coef)) -def _write_lut_file(scalefactor, lut, path): -    assert (len(lut) == LUT_LEN) +def _write_lut_file(scalefactor: float, lut: List[complex], path: str) -> None: +    assert len(lut) == LUT_LEN -    f = open(path, 'w') -    f.write("{}\n{}\n".format(FORMAT_LUT, scalefactor)) -    for coef in lut: -        f.write("{}\n{}\n".format(coef.real, coef.imag)) -    f.close() +    with open(path, 'w') as f: +        f.write("{}\n{}\n".format(FORMAT_LUT, scalefactor)) +        for coef in lut: +            f.write("{}\n{}\n".format(coef.real, coef.imag)) -def dpddata_to_str(dpddata): +def dpddata_to_str(dpddata) -> str:      if dpddata[0] == "poly":          coefs_am = dpddata[1]          coefs_pm = dpddata[2] @@ -57,215 +55,133 @@ def dpddata_to_str(dpddata):          raise ValueError("Unknown dpddata type {}".format(dpddata[0]))  class Adapt: -    """Uses the ZMQ remote control to change parameters of the DabMod +    """Uses the ZMQ remote control to change parameters of the DabMod """ -    Parameters -    ---------- -    port : int -        Port at which the ODR-DabMod is listening to connect the -        ZMQ remote control. -    """ - -    def __init__(self, port, coef_path, plot_location): +    def __init__(self, port: int, coef_path: str, plot_location: str):          logging.debug("Instantiate Adapt object") -        self.port = port -        self.coef_path = coef_path -        self.plot_location = plot_location -        self.host = "localhost" -        self._context = zmq.Context() - -    def _connect(self): -        """Establish the connection to ODR-DabMod using -        a ZMQ socket that is in request mode (Client). -        Returns a socket""" -        sock = self._context.socket(zmq.REQ) -        poller = zmq.Poller() -        poller.register(sock, zmq.POLLIN) - -        sock.connect("tcp://%s:%d" % (self.host, self.port)) - -        sock.send(b"ping") - -        socks = dict(poller.poll(1000)) -        if socks: -            if socks.get(sock) == zmq.POLLIN: -                data = [el.decode() for el in sock.recv_multipart()] - -                if data != ['ok']: -                    raise RuntimeError( -                        "Invalid ZMQ RC answer to 'ping' at %s %d: %s" % -                        (self.host, self.port, data)) -        else: -            sock.close(linger=10) -            raise RuntimeError( -                    "ZMQ RC does not respond to 'ping' at %s %d" % -                        (self.host, self.port)) - -        return sock - -    def send_receive(self, message): -        """Send a message to ODR-DabMod. It always -        returns the answer ODR-DabMod sends back. - -        An example message could be -        "get sdr txgain" or "set sdr txgain 50" - -        Parameter -        --------- -        message : str -            The message string that will be sent to the receiver. -        """ -        sock = self._connect() -        logging.debug("Send message: %s" % message) -        msg_parts = message.split(" ") -        for i, part in enumerate(msg_parts): -            if i == len(msg_parts) - 1: -                f = 0 -            else: -                f = zmq.SNDMORE - -            sock.send(part.encode(), flags=f) - -        data = [el.decode() for el in sock.recv_multipart()] -        logging.debug("Received message: %s" % message) -        return data - -    def set_txgain(self, gain): -        """Set a new txgain for the ODR-DabMod. +        self._port = port +        self._coef_path = coef_path +        self._plot_location = plot_location +        self._host = "localhost" +        self._mod_rc = zmqrc.ModRemoteControl(self._host, self._port) -        Parameters -        ---------- -        gain : int -            new TX gain, in the same format as ODR-DabMod's config file -        """ +    def set_txgain(self, gain : float) -> None:          # TODO this is specific to the B200          if gain < 0 or gain > 89:              raise ValueError("Gain has to be in [0,89]") -        return self.send_receive("set sdr txgain %.4f" % float(gain)) - -    def get_txgain(self): -        """Get the txgain value in dB for the ODR-DabMod.""" -        # TODO handle failure -        return float(self.send_receive("get sdr txgain")[0]) +        self._mod_rc.set_param_value("sdr", "txgain", "%.4f" % float(gain)) -    def set_rxgain(self, gain): -        """Set a new rxgain for the ODR-DabMod. +    def get_txgain(self) -> float: +        """Get the txgain value in dB, or -1 in case of error""" +        try: +            return float(self._mod_rc.get_param_value("sdr", "txgain")) +        except ValueError as e: +            logging.warning(f"Adapt: get_txgain error: {e}") +            return -1.0 -        Parameters -        ---------- -        gain : int -            new RX gain, in the same format as ODR-DabMod's config file -        """ +    def set_rxgain(self, gain: float) -> None:          # TODO this is specific to the B200          if gain < 0 or gain > 89:              raise ValueError("Gain has to be in [0,89]") -        return self.send_receive("set sdr rxgain %.4f" % float(gain)) - -    def get_rxgain(self): -        """Get the rxgain value in dB for the ODR-DabMod.""" -        # TODO handle failure -        return float(self.send_receive("get sdr rxgain")[0]) - -    def set_digital_gain(self, gain): -        """Set a new rxgain for the ODR-DabMod. - -        Parameters -        ---------- -        gain : int -            new RX gain, in the same format as ODR-DabMod's config file -        """ -        msg = "set gain digital %.5f" % gain -        return self.send_receive(msg) - -    def get_digital_gain(self): -        """Get the rxgain value in dB for the ODR-DabMod.""" -        # TODO handle failure -        return float(self.send_receive("get gain digital")[0]) +        self._mod_rc.set_param_value("sdr", "rxgain", "%.4f" % float(gain)) + +    def get_rxgain(self) -> float: +        """Get the rxgain value in dB, or -1 in case of error""" +        try: +            return float(self._mod_rc.get_param_value("sdr", "rxgain")) +        except ValueError as e: +            logging.warning(f"Adapt: get_rxgain error: {e}") +            return -1.0 + +    def set_digital_gain(self, gain: float) -> None: +        self._mod_rc.set_param_value("gain", "digital", "%.5f" % float(gain)) + +    def get_digital_gain(self) -> float: +        """Get the digital gain value in linear scale, or -1 in case +        of error""" +        try: +            return float(self._mod_rc.get_param_value("gain", "digital")) +        except ValueError as e: +            logging.warning(f"Adapt: get_digital_gain error: {e}") +            return -1.0      def get_predistorter(self):          """Load the coefficients from the file in the format given in the README,          return ("poly", [AM coef], [PM coef]) or ("lut", scalefactor, [LUT entries])          """ -        f = open(self.coef_path, 'r') -        lines = f.readlines() -        predistorter_format = int(lines[0]) -        if predistorter_format == FORMAT_POLY: -            coefs_am_out = [] -            coefs_pm_out = [] -            n_coefs = int(lines[1]) -            coefs = [float(l) for l in lines[2:]] -            i = 0 -            for c in coefs: -                if i < n_coefs: -                    coefs_am_out.append(c) -                elif i < 2 * n_coefs: -                    coefs_pm_out.append(c) -                else: -                    raise ValueError( -                        'Incorrect coef file format: too many' -                        ' coefficients in {}, should be {}, coefs are {}' -                            .format(self.coef_path, n_coefs, coefs)) -                i += 1 -            f.close() -            return 'poly', coefs_am_out, coefs_pm_out -        elif predistorter_format == FORMAT_LUT: -            scalefactor = int(lines[1]) -            coefs = np.array([float(l) for l in lines[2:]], dtype=np.float32) -            coefs = coefs.reshape((-1, 2)) -            lut = coefs[..., 0] + 1j * coefs[..., 1] -            if len(lut) != LUT_LEN: -                raise ValueError("Incorrect number of LUT entries ({} expected {})".format(len(lut), LUT_LEN)) -            return 'lut', scalefactor, lut -        else: -            raise ValueError("Unknown predistorter format {}".format(predistorter_format)) +        with open(self._coef_path, 'r') as f: +            lines = f.readlines() +            predistorter_format = int(lines[0]) +            if predistorter_format == FORMAT_POLY: +                coefs_am_out = [] +                coefs_pm_out = [] +                n_coefs = int(lines[1]) +                coefs = [float(l) for l in lines[2:]] +                for i, c in enumerate(coefs): +                    if i < n_coefs: +                        coefs_am_out.append(c) +                    elif i < 2 * n_coefs: +                        coefs_pm_out.append(c) +                    else: +                        raise ValueError( +                            'Incorrect coef file format: too many' +                            ' coefficients in {}, should be {}, coefs are {}' +                                .format(self._coef_path, n_coefs, coefs)) +                return 'poly', coefs_am_out, coefs_pm_out +            elif predistorter_format == FORMAT_LUT: +                scalefactor = int(lines[1]) +                coefs = np.array([float(l) for l in lines[2:]], dtype=np.float32) +                coefs = coefs.reshape((-1, 2)) +                lut = coefs[..., 0] + 1j * coefs[..., 1] +                if len(lut) != LUT_LEN: +                    raise ValueError("Incorrect number of LUT entries ({} expected {})".format(len(lut), LUT_LEN)) +                return 'lut', scalefactor, lut +            else: +                raise ValueError("Unknown predistorter format {}".format(predistorter_format)) -    def set_predistorter(self, dpddata): +    def set_predistorter(self, dpddata) -> None:          """Update the predistorter data in the modulator. Takes the same          tuple format as argument than the one returned get_predistorter()"""          if dpddata[0] == "poly":              coefs_am = dpddata[1]              coefs_pm = dpddata[2] -            _write_poly_coef_file(coefs_am, coefs_pm, self.coef_path) +            _write_poly_coef_file(coefs_am, coefs_pm, self._coef_path)          elif dpddata[0] == "lut":              scalefactor = dpddata[1]              lut = dpddata[2] -            _write_lut_file(scalefactor, lut, self.coef_path) +            _write_lut_file(scalefactor, lut, self._coef_path)          else:              raise ValueError("Unknown predistorter '{}'".format(dpddata[0])) -        return self.send_receive("set memlesspoly coeffile {}".format(self.coef_path)) +        self._mod_rc.set_param_value("memlesspoly", "coefffile", self._coef_path) -    def dump(self, path=None): +    def dump(self, path: str) -> None:          """Backup current settings to a file""" -        dt = datetime.datetime.now().isoformat() -        if path is None: -            if self.plot_location is not None: -                path = self.plot_location + "/" + dt + "_adapt.pkl" -            else: -                raise Exception("Cannot dump Adapt without either plot_location or path set") +          d = {              "txgain": self.get_txgain(),              "rxgain": self.get_rxgain(),              "digital_gain": self.get_digital_gain(),              "predistorter": self.get_predistorter()          } +          with open(path, "wb") as f:              pickle.dump(d, f) -        return path - -    def load(self, path): +    def load(self, path: str) -> None:          """Restore settings from a file"""          with open(path, "rb") as f:              d = pickle.load(f) -            self.set_txgain(d["txgain"]) +            self.set_txgain(0)              self.set_digital_gain(d["digital_gain"])              self.set_rxgain(d["rxgain"])              self.set_predistorter(d["predistorter"]) +            self.set_txgain(d["txgain"])  # The MIT License (MIT)  # -# Copyright (c) 2017 Andreas Steger, Matthias P. Braendli +# Copyright (c) 2018 Matthias P. Braendli +# Copyright (c) 2017 Andreas Steger  #  # Permission is hereby granted, free of charge, to any person obtaining a copy  # of this software and associated documentation files (the "Software"), to deal diff --git a/python/dpdce.py b/python/dpdce.py index 3b294eb..6c373fd 100755 --- a/python/dpdce.py +++ b/python/dpdce.py @@ -310,7 +310,9 @@ def engine_worker():                  txframe_aligned, tx_ts, rxframe_aligned, rx_ts, rx_median, tx_median = meas.get_samples()                  # Store all settings for pre-distortion, tx and rx -                adapt.dump() +                utctime = datetime.datetime.utcnow() +                dump_file = "adapt_{}.pkl".format(utctime.strftime("%s")) +                adapt.dump(os.path.join(plot_path, dump_file))                  # Collect logging data                  off = symbol_align.calc_offset(txframe_aligned) diff --git a/python/lib/zmqrc.py b/python/lib/zmqrc.py index 2d82b3e..423f91d 100644 --- a/python/lib/zmqrc.py +++ b/python/lib/zmqrc.py @@ -22,15 +22,16 @@  #   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>.  import zmq  import json +from typing import List -class ModRemoteControl(object): +class ModRemoteControl:      """Interact with ODR-DabMod using the ZMQ RC"""      def __init__(self, mod_host, mod_port=9400):          self._host = mod_host          self._port = mod_port          self._ctx = zmq.Context() -    def _read(self, message_parts): +    def _read(self, message_parts: List[str]):          sock = zmq.Socket(self._ctx, zmq.REQ)          sock.setsockopt(zmq.LINGER, 0)          sock.connect("tcp://{}:{}".format(self._host, self._port)) @@ -70,14 +71,14 @@ class ModRemoteControl(object):          return modules -    def get_param_value(self, module, param): +    def get_param_value(self, module: str, param: str) -> str:          value = self._read(['get', module, param])          if value[0] == 'fail':              raise ValueError("Error getting param: {}".format(value[1]))          else:              return value[0] -    def set_param_value(self, module, param, value): +    def set_param_value(self, module: str, param: str, value: str) -> None:          ret = self._read(['set', module, param, value])          if ret[0] == 'fail':              raise ValueError("Error setting param: {}".format(ret[1]))  | 
