aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/components.py
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/components.py')
-rw-r--r--mpm/python/usrp_mpm/components.py139
1 files changed, 131 insertions, 8 deletions
diff --git a/mpm/python/usrp_mpm/components.py b/mpm/python/usrp_mpm/components.py
index 5d1a31325..a87dbdce7 100644
--- a/mpm/python/usrp_mpm/components.py
+++ b/mpm/python/usrp_mpm/components.py
@@ -7,6 +7,7 @@
MPM Component management
"""
import os
+import re
import shutil
import subprocess
from usrp_mpm.rpc_server import no_rpc
@@ -30,6 +31,124 @@ class ZynqComponents(object):
###########################################################################
# Component updating
###########################################################################
+ def _log_and_raise(self, logstr):
+ self.log.error(logstr)
+ raise RuntimeError(logstr)
+
+ @classmethod
+ def _parse_dts_mpm_version_tag(cls, text):
+ """ parse a version line from the dts file. E.g.
+ "// mpm_version component1 3.4.5" will return
+ {"component1": (3, 4, 5)} """
+ dts_version_re = re.compile(r'^// mpm_version\s+(?P<comp>\S+)\s+(?P<ver>\S+)$')
+ match = dts_version_re.match(text)
+ if match is None:
+ return (None, None)
+
+ component = match[1]
+ version_list = [int(x, base=0) for x in match[2].split('.')]
+ return (component, tuple(version_list))
+
+ @classmethod
+ def _parse_dts_version_info_from_file(cls, filepath):
+ """
+ parse all version informations from dts file and store in dict
+ a c-style comment in the dts file like this
+ // mpm_version component1 3.4.5
+ // mpm_version other_component 3.5.0
+ will return a dict:
+ {"component1": (3, 4, 5), "other_component": (3, 5, 0)"}
+ """
+ suffix_current = "_current_version"
+ suffix_oldest = "_oldest_compatible_version"
+
+ version_info = {}
+ with open(filepath) as f:
+ text = f.read()
+ for line in text.splitlines():
+ component, version = cls._parse_dts_mpm_version_tag(line)
+ if not component:
+ continue
+ if component.endswith(suffix_oldest):
+ component = component[:-len(suffix_oldest)]
+ version_type = 'oldest'
+ elif component.endswith(suffix_current):
+ component = component[:-len(suffix_current)]
+ version_type = 'current'
+ else:
+ version_type = 'current'
+ if component not in version_info:
+ version_info[component] = {}
+ version_info[component][version_type] = version
+ return version_info
+
+ def _verify_compatibility(self, filebasename, update_dict):
+ """
+ check whether the given MPM compatibility matches the
+ version information stored in the FPGA DTS file
+ """
+ def _get_version_string(versions_enum):
+ version_strings = []
+ if 'current' in versions_enum:
+ version_strings.append("current: {}".format(
+ ".".join([str(x) for x in versions_enum['current']])))
+ if 'oldest' in versions_enum:
+ version_strings.append("oldest compatible: {}".format(
+ ".".join([str(x) for x in versions_enum['oldest']])))
+ return ', '.join(version_strings)
+
+
+ if update_dict.get('check_dts_for_compatibility'):
+ self.log.trace("Compatibility check MPM <-> FPGA via DTS enabled")
+ dtsfilepath = filebasename + '.dts'
+ if not os.path.exists(dtsfilepath):
+ self._log_and_raise("DTS file not found: {}".format(dtsfilepath))
+ self.log.trace("Parse DTS file {} for version information"\
+ .format(dtsfilepath))
+ fpga_versions = self._parse_dts_version_info_from_file(dtsfilepath)
+ if not fpga_versions:
+ self._log_and_raise("no component version information in DTS file")
+ if 'compatibility' not in update_dict:
+ self._log_and_raise("MPM FPGA version infos not found")
+ mpm_versions = update_dict['compatibility']
+ self.log.trace("DTS version infos: {}".format(fpga_versions))
+ self.log.trace("MPM version infos: {}".format(mpm_versions))
+
+ try:
+ for component in mpm_versions.keys():
+ # check for components that aren't available in the DTS file
+ if component in fpga_versions.keys():
+ self.log.trace(f"check compatibility for: FPGA-{component}")
+ mpm_version = mpm_versions[component]
+ fpga_version = fpga_versions[component]
+ self.log.trace("mpm_version: current: {}, compatible: {}".format(
+ mpm_version['current'], mpm_version['oldest']))
+ self.log.trace("fpga_version: current: {}, compatible: {}".format(
+ fpga_version['current'], fpga_version['oldest']))
+ if mpm_version['oldest'][0] > fpga_version['current'][0]:
+ error = "Component {} is too old ({}, MPM version: {})".format(
+ component,
+ _get_version_string(fpga_version),
+ _get_version_string(mpm_version))
+ self._log_and_raise(error)
+ elif mpm_version['current'][0] < fpga_version['oldest'][0]:
+ error = "Component {} is too new ({}, MPM version: {})".format(
+ component,
+ _get_version_string(fpga_version),
+ _get_version_string(mpm_version))
+ self._log_and_raise(error)
+ self.log.trace(f"Component {component} is good!")
+ else:
+ self.log.warning(f"component {component} defined in "\
+ f"MPM but not found in FPGA info, skipping.")
+ except RuntimeError as ex:
+ self._log_and_raise("MPM compatibility infos suggest that the "\
+ "new bitfile is not compatible, skipping installation. {}"\
+ .format(ex))
+ else:
+ self.log.trace("Compatibility check MPM <-> FPGA is disabled")
+ return
+
@no_rpc
def update_fpga(self, filepath, metadata):
"""
@@ -37,13 +156,19 @@ class ZynqComponents(object):
:param filepath: path to new FPGA image
:param metadata: Dictionary of strings containing metadata
"""
- self.log.trace("Updating FPGA with image at {} (metadata: `{}')"
- .format(filepath, str(metadata)))
- _, file_extension = os.path.splitext(filepath)
+ self.log.trace(f"Updating FPGA with image at {filepath}"\
+ " (metadata: `{str(metadata)}')")
+ file_name, file_extension = os.path.splitext(filepath)
+ self.log.trace("file_name: {}".format(file_name))
# Cut off the period from the file extension
file_extension = file_extension[1:].lower()
- binfile_path = self.updateable_components['fpga']['path'].format(
- self.device_info.get('product'))
+ if file_extension not in ['bit', 'bin']:
+ self._log_and_raise(f"Invalid FPGA bitfile: {filepath}")
+ binfile_path = self.updateable_components['fpga']['path']\
+ .format(self.device_info.get('product'))
+
+ self._verify_compatibility(file_name, self.updateable_components['fpga'])
+
if file_extension == "bit":
self.log.trace("Converting bit to bin file and writing to {}"
.format(binfile_path))
@@ -52,9 +177,7 @@ class ZynqComponents(object):
elif file_extension == "bin":
self.log.trace("Copying bin file to %s", binfile_path)
shutil.copy(filepath, binfile_path)
- else:
- self.log.error("Invalid FPGA bitfile: %s", filepath)
- raise RuntimeError("Invalid N3xx FPGA bitfile")
+
# RPC server will reload the periph manager after this.
return True