aboutsummaryrefslogtreecommitdiffstats
path: root/python/dpd/Adapt.py
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-01-23 11:00:02 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-01-23 11:00:02 +0100
commit201d711a1d3dfbe46d622871731005937598e790 (patch)
treee43a95ee027e1be6ca8621f9e2c78aaf932a3421 /python/dpd/Adapt.py
parent674228bedb325384f12602350ab36d075b5509a3 (diff)
parente0abfc3728fb56519fa2507d2468214e2a633c98 (diff)
downloaddabmod-201d711a1d3dfbe46d622871731005937598e790.tar.gz
dabmod-201d711a1d3dfbe46d622871731005937598e790.tar.bz2
dabmod-201d711a1d3dfbe46d622871731005937598e790.zip
Merge branch 'next' into lime
Diffstat (limited to 'python/dpd/Adapt.py')
-rw-r--r--python/dpd/Adapt.py291
1 files changed, 106 insertions, 185 deletions
diff --git a/python/dpd/Adapt.py b/python/dpd/Adapt.py
index 840aee9..a30f0c8 100644
--- a/python/dpd/Adapt.py
+++ b/python/dpd/Adapt.py
@@ -9,263 +9,184 @@ This module is used to change settings of ODR-DabMod using
the ZMQ remote control socket.
"""
-import zmq
import logging
import numpy as np
-import os
-import datetime
+import os.path
import pickle
+from lib import zmqrc
+from typing import List
LUT_LEN = 32
FORMAT_POLY = 1
FORMAT_LUT = 2
-def _write_poly_coef_file(coefs_am, coefs_pm, path):
- assert (len(coefs_am) == len(coefs_pm))
+def _write_poly_coef_file(coefs_am: List[float], coefs_pm: List[float], path: str) -> None:
+ assert len(coefs_am) == len(coefs_pm)
- f = open(path, 'w')
- f.write("{}\n{}\n".format(FORMAT_POLY, len(coefs_am)))
- for coef in coefs_am:
- f.write("{}\n".format(coef))
- for coef in coefs_pm:
- f.write("{}\n".format(coef))
- f.close()
+ with open(path, 'w') as f:
+ f.write("{}\n{}\n".format(FORMAT_POLY, len(coefs_am)))
+ for coef in coefs_am:
+ f.write("{}\n".format(coef))
+ for coef in coefs_pm:
+ f.write("{}\n".format(coef))
-def _write_lut_file(scalefactor, lut, path):
- assert (len(lut) == LUT_LEN)
+def _write_lut_file(scalefactor: float, lut: List[complex], path: str) -> None:
+ assert len(lut) == LUT_LEN
- f = open(path, 'w')
- f.write("{}\n{}\n".format(FORMAT_LUT, scalefactor))
- for coef in lut:
- f.write("{}\n{}\n".format(coef.real, coef.imag))
- f.close()
+ with open(path, 'w') as f:
+ f.write("{}\n{}\n".format(FORMAT_LUT, scalefactor))
+ for coef in lut:
+ f.write("{}\n{}\n".format(coef.real, coef.imag))
-def dpddata_to_str(dpddata):
+def dpddata_to_str(dpddata) -> str:
if dpddata[0] == "poly":
coefs_am = dpddata[1]
coefs_pm = dpddata[2]
- return "dpd_coefs_am {}, dpd_coefs_pm {}".format(
+ return "Poly: AM/AM {}, AM/PM {}".format(
coefs_am, coefs_pm)
elif dpddata[0] == "lut":
scalefactor = dpddata[1]
lut = dpddata[2]
- return "LUT scalefactor {}, LUT {}".format(
+ return "LUT: scalefactor {}, LUT {}".format(
scalefactor, lut)
else:
raise ValueError("Unknown dpddata type {}".format(dpddata[0]))
class Adapt:
- """Uses the ZMQ remote control to change parameters of the DabMod
+ """Uses the ZMQ remote control to change parameters of the DabMod """
- Parameters
- ----------
- port : int
- Port at which the ODR-DabMod is listening to connect the
- ZMQ remote control.
- """
-
- def __init__(self, port, coef_path, plot_location):
+ def __init__(self, port: int, coef_path: str, plot_location: str):
logging.debug("Instantiate Adapt object")
- self.port = port
- self.coef_path = coef_path
- self.plot_location = plot_location
- self.host = "localhost"
- self._context = zmq.Context()
-
- def _connect(self):
- """Establish the connection to ODR-DabMod using
- a ZMQ socket that is in request mode (Client).
- Returns a socket"""
- sock = self._context.socket(zmq.REQ)
- poller = zmq.Poller()
- poller.register(sock, zmq.POLLIN)
-
- sock.connect("tcp://%s:%d" % (self.host, self.port))
-
- sock.send(b"ping")
-
- socks = dict(poller.poll(1000))
- if socks:
- if socks.get(sock) == zmq.POLLIN:
- data = [el.decode() for el in sock.recv_multipart()]
-
- if data != ['ok']:
- raise RuntimeError(
- "Invalid ZMQ RC answer to 'ping' at %s %d: %s" %
- (self.host, self.port, data))
- else:
- sock.close(linger=10)
- raise RuntimeError(
- "ZMQ RC does not respond to 'ping' at %s %d" %
- (self.host, self.port))
-
- return sock
-
- def send_receive(self, message):
- """Send a message to ODR-DabMod. It always
- returns the answer ODR-DabMod sends back.
+ self._port = port
+ self._coef_path = coef_path
+ self._plot_location = plot_location
+ self._host = "localhost"
+ self._mod_rc = zmqrc.ModRemoteControl(self._host, self._port)
- An example message could be
- "get sdr txgain" or "set sdr txgain 50"
-
- Parameter
- ---------
- message : str
- The message string that will be sent to the receiver.
- """
- sock = self._connect()
- logging.debug("Send message: %s" % message)
- msg_parts = message.split(" ")
- for i, part in enumerate(msg_parts):
- if i == len(msg_parts) - 1:
- f = 0
- else:
- f = zmq.SNDMORE
-
- sock.send(part.encode(), flags=f)
-
- data = [el.decode() for el in sock.recv_multipart()]
- logging.debug("Received message: %s" % message)
- return data
-
- def set_txgain(self, gain):
- """Set a new txgain for the ODR-DabMod.
-
- Parameters
- ----------
- gain : int
- new TX gain, in the same format as ODR-DabMod's config file
- """
+ def set_txgain(self, gain : float) -> None:
# TODO this is specific to the B200
if gain < 0 or gain > 89:
raise ValueError("Gain has to be in [0,89]")
- return self.send_receive("set sdr txgain %.4f" % float(gain))
+ self._mod_rc.set_param_value("sdr", "txgain", "%.4f" % float(gain))
- def get_txgain(self):
- """Get the txgain value in dB for the ODR-DabMod."""
- # TODO handle failure
- return float(self.send_receive("get sdr txgain")[0])
+ def get_txgain(self) -> float:
+ """Get the txgain value in dB, or -1 in case of error"""
+ try:
+ return float(self._mod_rc.get_param_value("sdr", "txgain"))
+ except ValueError as e:
+ logging.warning(f"Adapt: get_txgain error: {e}")
+ return -1.0
- def set_rxgain(self, gain):
- """Set a new rxgain for the ODR-DabMod.
-
- Parameters
- ----------
- gain : int
- new RX gain, in the same format as ODR-DabMod's config file
- """
+ def set_rxgain(self, gain: float) -> None:
# TODO this is specific to the B200
if gain < 0 or gain > 89:
raise ValueError("Gain has to be in [0,89]")
- return self.send_receive("set sdr rxgain %.4f" % float(gain))
-
- def get_rxgain(self):
- """Get the rxgain value in dB for the ODR-DabMod."""
- # TODO handle failure
- return float(self.send_receive("get sdr rxgain")[0])
-
- def set_digital_gain(self, gain):
- """Set a new rxgain for the ODR-DabMod.
-
- Parameters
- ----------
- gain : int
- new RX gain, in the same format as ODR-DabMod's config file
- """
- msg = "set gain digital %.5f" % gain
- return self.send_receive(msg)
-
- def get_digital_gain(self):
- """Get the rxgain value in dB for the ODR-DabMod."""
- # TODO handle failure
- return float(self.send_receive("get gain digital")[0])
+ self._mod_rc.set_param_value("sdr", "rxgain", "%.4f" % float(gain))
+
+ def get_rxgain(self) -> float:
+ """Get the rxgain value in dB, or -1 in case of error"""
+ try:
+ return float(self._mod_rc.get_param_value("sdr", "rxgain"))
+ except ValueError as e:
+ logging.warning(f"Adapt: get_rxgain error: {e}")
+ return -1.0
+
+ def set_digital_gain(self, gain: float) -> None:
+ self._mod_rc.set_param_value("gain", "digital", "%.5f" % float(gain))
+
+ def get_digital_gain(self) -> float:
+ """Get the digital gain value in linear scale, or -1 in case
+ of error"""
+ try:
+ return float(self._mod_rc.get_param_value("gain", "digital"))
+ except ValueError as e:
+ logging.warning(f"Adapt: get_digital_gain error: {e}")
+ return -1.0
def get_predistorter(self):
"""Load the coefficients from the file in the format given in the README,
return ("poly", [AM coef], [PM coef]) or ("lut", scalefactor, [LUT entries])
"""
- f = open(self.coef_path, 'r')
- lines = f.readlines()
- predistorter_format = int(lines[0])
- if predistorter_format == FORMAT_POLY:
- coefs_am_out = []
- coefs_pm_out = []
- n_coefs = int(lines[1])
- coefs = [float(l) for l in lines[2:]]
- i = 0
- for c in coefs:
- if i < n_coefs:
- coefs_am_out.append(c)
- elif i < 2 * n_coefs:
- coefs_pm_out.append(c)
- else:
- raise ValueError(
- 'Incorrect coef file format: too many'
- ' coefficients in {}, should be {}, coefs are {}'
- .format(self.coef_path, n_coefs, coefs))
- i += 1
- f.close()
- return 'poly', coefs_am_out, coefs_pm_out
- elif predistorter_format == FORMAT_LUT:
- scalefactor = int(lines[1])
- coefs = np.array([float(l) for l in lines[2:]], dtype=np.float32)
- coefs = coefs.reshape((-1, 2))
- lut = coefs[..., 0] + 1j * coefs[..., 1]
- if len(lut) != LUT_LEN:
- raise ValueError("Incorrect number of LUT entries ({} expected {})".format(len(lut), LUT_LEN))
- return 'lut', scalefactor, lut
- else:
- raise ValueError("Unknown predistorter format {}".format(predistorter_format))
+ with open(self._coef_path, 'r') as f:
+ lines = f.readlines()
+ predistorter_format = int(lines[0])
+ if predistorter_format == FORMAT_POLY:
+ coefs_am_out = []
+ coefs_pm_out = []
+ n_coefs = int(lines[1])
+ coefs = [float(l) for l in lines[2:]]
+ for i, c in enumerate(coefs):
+ if i < n_coefs:
+ coefs_am_out.append(c)
+ elif i < 2 * n_coefs:
+ coefs_pm_out.append(c)
+ else:
+ raise ValueError(
+ 'Incorrect coef file format: too many'
+ ' coefficients in {}, should be {}, coefs are {}'
+ .format(self._coef_path, n_coefs, coefs))
+ return 'poly', coefs_am_out, coefs_pm_out
+ elif predistorter_format == FORMAT_LUT:
+ scalefactor = int(lines[1])
+ coefs = np.array([float(l) for l in lines[2:]], dtype=np.float32)
+ coefs = coefs.reshape((-1, 2))
+ lut = coefs[..., 0] + 1j * coefs[..., 1]
+ if len(lut) != LUT_LEN:
+ raise ValueError("Incorrect number of LUT entries ({} expected {})".format(len(lut), LUT_LEN))
+ return 'lut', scalefactor, lut
+ else:
+ raise ValueError("Unknown predistorter format {}".format(predistorter_format))
- def set_predistorter(self, dpddata):
+ def set_predistorter(self, dpddata) -> None:
"""Update the predistorter data in the modulator. Takes the same
tuple format as argument than the one returned get_predistorter()"""
if dpddata[0] == "poly":
coefs_am = dpddata[1]
coefs_pm = dpddata[2]
- _write_poly_coef_file(coefs_am, coefs_pm, self.coef_path)
+ _write_poly_coef_file(coefs_am, coefs_pm, self._coef_path)
elif dpddata[0] == "lut":
scalefactor = dpddata[1]
lut = dpddata[2]
- _write_lut_file(scalefactor, lut, self.coef_path)
+ _write_lut_file(scalefactor, lut, self._coef_path)
else:
raise ValueError("Unknown predistorter '{}'".format(dpddata[0]))
- self.send_receive("set memlesspoly coeffile {}".format(self.coef_path))
+ self._mod_rc.set_param_value("memlesspoly", "coeffile", self._coef_path)
- def dump(self, path=None):
+ def dump(self, path: str) -> None:
"""Backup current settings to a file"""
- dt = datetime.datetime.now().isoformat()
- if path is None:
- if self.plot_location is not None:
- path = self.plot_location + "/" + dt + "_adapt.pkl"
- else:
- raise Exception("Cannot dump Adapt without either plot_location or path set")
+
d = {
"txgain": self.get_txgain(),
"rxgain": self.get_rxgain(),
"digital_gain": self.get_digital_gain(),
- "predistorter": self.get_predistorter()
+ "dpddata": self.get_predistorter()
}
+
with open(path, "wb") as f:
pickle.dump(d, f)
- return path
-
- def load(self, path):
+ def restore(self, path: str):
"""Restore settings from a file"""
with open(path, "rb") as f:
d = pickle.load(f)
- self.set_txgain(d["txgain"])
+ self.set_txgain(0)
+
+ # If any of the following fail, we will be running
+ # with the safe value of txgain=0
self.set_digital_gain(d["digital_gain"])
self.set_rxgain(d["rxgain"])
- self.set_predistorter(d["predistorter"])
+ self.set_predistorter(d["dpddata"])
+ self.set_txgain(d["txgain"])
+
+ return d
# The MIT License (MIT)
#
-# Copyright (c) 2017 Andreas Steger, Matthias P. Braendli
+# Copyright (c) 2019 Matthias P. Braendli
+# Copyright (c) 2017 Andreas Steger
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal