diff options
Diffstat (limited to 'mpm/python/usrp_mpm/components.py')
-rw-r--r-- | mpm/python/usrp_mpm/components.py | 139 |
1 files changed, 131 insertions, 8 deletions
diff --git a/mpm/python/usrp_mpm/components.py b/mpm/python/usrp_mpm/components.py index 5d1a31325..a87dbdce7 100644 --- a/mpm/python/usrp_mpm/components.py +++ b/mpm/python/usrp_mpm/components.py @@ -7,6 +7,7 @@ MPM Component management """ import os +import re import shutil import subprocess from usrp_mpm.rpc_server import no_rpc @@ -30,6 +31,124 @@ class ZynqComponents(object): ########################################################################### # Component updating ########################################################################### + def _log_and_raise(self, logstr): + self.log.error(logstr) + raise RuntimeError(logstr) + + @classmethod + def _parse_dts_mpm_version_tag(cls, text): + """ parse a version line from the dts file. E.g. + "// mpm_version component1 3.4.5" will return + {"component1": (3, 4, 5)} """ + dts_version_re = re.compile(r'^// mpm_version\s+(?P<comp>\S+)\s+(?P<ver>\S+)$') + match = dts_version_re.match(text) + if match is None: + return (None, None) + + component = match[1] + version_list = [int(x, base=0) for x in match[2].split('.')] + return (component, tuple(version_list)) + + @classmethod + def _parse_dts_version_info_from_file(cls, filepath): + """ + parse all version informations from dts file and store in dict + a c-style comment in the dts file like this + // mpm_version component1 3.4.5 + // mpm_version other_component 3.5.0 + will return a dict: + {"component1": (3, 4, 5), "other_component": (3, 5, 0)"} + """ + suffix_current = "_current_version" + suffix_oldest = "_oldest_compatible_version" + + version_info = {} + with open(filepath) as f: + text = f.read() + for line in text.splitlines(): + component, version = cls._parse_dts_mpm_version_tag(line) + if not component: + continue + if component.endswith(suffix_oldest): + component = component[:-len(suffix_oldest)] + version_type = 'oldest' + elif component.endswith(suffix_current): + component = component[:-len(suffix_current)] + version_type = 'current' + else: + version_type = 'current' + if component not in version_info: + version_info[component] = {} + version_info[component][version_type] = version + return version_info + + def _verify_compatibility(self, filebasename, update_dict): + """ + check whether the given MPM compatibility matches the + version information stored in the FPGA DTS file + """ + def _get_version_string(versions_enum): + version_strings = [] + if 'current' in versions_enum: + version_strings.append("current: {}".format( + ".".join([str(x) for x in versions_enum['current']]))) + if 'oldest' in versions_enum: + version_strings.append("oldest compatible: {}".format( + ".".join([str(x) for x in versions_enum['oldest']]))) + return ', '.join(version_strings) + + + if update_dict.get('check_dts_for_compatibility'): + self.log.trace("Compatibility check MPM <-> FPGA via DTS enabled") + dtsfilepath = filebasename + '.dts' + if not os.path.exists(dtsfilepath): + self._log_and_raise("DTS file not found: {}".format(dtsfilepath)) + self.log.trace("Parse DTS file {} for version information"\ + .format(dtsfilepath)) + fpga_versions = self._parse_dts_version_info_from_file(dtsfilepath) + if not fpga_versions: + self._log_and_raise("no component version information in DTS file") + if 'compatibility' not in update_dict: + self._log_and_raise("MPM FPGA version infos not found") + mpm_versions = update_dict['compatibility'] + self.log.trace("DTS version infos: {}".format(fpga_versions)) + self.log.trace("MPM version infos: {}".format(mpm_versions)) + + try: + for component in mpm_versions.keys(): + # check for components that aren't available in the DTS file + if component in fpga_versions.keys(): + self.log.trace(f"check compatibility for: FPGA-{component}") + mpm_version = mpm_versions[component] + fpga_version = fpga_versions[component] + self.log.trace("mpm_version: current: {}, compatible: {}".format( + mpm_version['current'], mpm_version['oldest'])) + self.log.trace("fpga_version: current: {}, compatible: {}".format( + fpga_version['current'], fpga_version['oldest'])) + if mpm_version['oldest'][0] > fpga_version['current'][0]: + error = "Component {} is too old ({}, MPM version: {})".format( + component, + _get_version_string(fpga_version), + _get_version_string(mpm_version)) + self._log_and_raise(error) + elif mpm_version['current'][0] < fpga_version['oldest'][0]: + error = "Component {} is too new ({}, MPM version: {})".format( + component, + _get_version_string(fpga_version), + _get_version_string(mpm_version)) + self._log_and_raise(error) + self.log.trace(f"Component {component} is good!") + else: + self.log.warning(f"component {component} defined in "\ + f"MPM but not found in FPGA info, skipping.") + except RuntimeError as ex: + self._log_and_raise("MPM compatibility infos suggest that the "\ + "new bitfile is not compatible, skipping installation. {}"\ + .format(ex)) + else: + self.log.trace("Compatibility check MPM <-> FPGA is disabled") + return + @no_rpc def update_fpga(self, filepath, metadata): """ @@ -37,13 +156,19 @@ class ZynqComponents(object): :param filepath: path to new FPGA image :param metadata: Dictionary of strings containing metadata """ - self.log.trace("Updating FPGA with image at {} (metadata: `{}')" - .format(filepath, str(metadata))) - _, file_extension = os.path.splitext(filepath) + self.log.trace(f"Updating FPGA with image at {filepath}"\ + " (metadata: `{str(metadata)}')") + file_name, file_extension = os.path.splitext(filepath) + self.log.trace("file_name: {}".format(file_name)) # Cut off the period from the file extension file_extension = file_extension[1:].lower() - binfile_path = self.updateable_components['fpga']['path'].format( - self.device_info.get('product')) + if file_extension not in ['bit', 'bin']: + self._log_and_raise(f"Invalid FPGA bitfile: {filepath}") + binfile_path = self.updateable_components['fpga']['path']\ + .format(self.device_info.get('product')) + + self._verify_compatibility(file_name, self.updateable_components['fpga']) + if file_extension == "bit": self.log.trace("Converting bit to bin file and writing to {}" .format(binfile_path)) @@ -52,9 +177,7 @@ class ZynqComponents(object): elif file_extension == "bin": self.log.trace("Copying bin file to %s", binfile_path) shutil.copy(filepath, binfile_path) - else: - self.log.error("Invalid FPGA bitfile: %s", filepath) - raise RuntimeError("Invalid N3xx FPGA bitfile") + # RPC server will reload the periph manager after this. return True |