aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/periph_manager/e320.py
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/periph_manager/e320.py')
-rw-r--r--mpm/python/usrp_mpm/periph_manager/e320.py671
1 files changed, 671 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/periph_manager/e320.py b/mpm/python/usrp_mpm/periph_manager/e320.py
new file mode 100644
index 000000000..ac50909ff
--- /dev/null
+++ b/mpm/python/usrp_mpm/periph_manager/e320.py
@@ -0,0 +1,671 @@
+#
+# Copyright 2018 Ettus Research, a National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+E320 implementation module
+"""
+
+from __future__ import print_function
+import bisect
+import copy
+import threading
+from six import iteritems, itervalues
+from usrp_mpm.components import ZynqComponents
+from usrp_mpm.dboard_manager import Neon
+from usrp_mpm.mpmtypes import SID
+from usrp_mpm.mpmutils import assert_compat_number, str2bool
+from usrp_mpm.periph_manager import PeriphManagerBase
+from usrp_mpm.rpc_server import no_rpc
+from usrp_mpm.sys_utils import dtoverlay
+from usrp_mpm.sys_utils.udev import get_spidev_nodes
+from usrp_mpm.xports import XportMgrUDP, XportMgrLiberio
+from usrp_mpm.periph_manager.e320_periphs import MboardRegsControl
+
+E320_DEFAULT_INT_CLOCK_FREQ = 20e6
+E320_DEFAULT_EXT_CLOCK_FREQ = 10e6
+E320_DEFAULT_CLOCK_SOURCE = 'internal'
+E320_DEFAULT_TIME_SOURCE = 'internal'
+E320_DEFAULT_ENABLE_GPS = True
+E320_DEFAULT_FPGPIO_VOLTAGE = 0
+E320_FPGA_COMPAT = (3, 0)
+E320_MONITOR_THREAD_INTERVAL = 1.0 # seconds # TODO Verify this
+E320_DBOARD_SLOT_IDX = 0
+
+
+###############################################################################
+# Transport managers
+###############################################################################
+class E320XportMgrUDP(XportMgrUDP):
+ "E320-specific UDP configuration"
+ xbar_dev = "/dev/crossbar0"
+ iface_config = {
+ 'sfp0': {
+ 'label': 'misc-enet-regs',
+ 'xbar': 0,
+ 'xbar_port': 0,
+ 'ctrl_src_addr': 0,
+ }
+ }
+
+class E320XportMgrLiberio(XportMgrLiberio):
+ " E320-specific Liberio configuration "
+ max_chan = 6
+ xbar_dev = "/dev/crossbar0"
+ xbar_port = 1
+
+###############################################################################
+# Main Class
+###############################################################################
+class e320(ZynqComponents, PeriphManagerBase):
+ """
+ Holds E320 specific attributes and methods
+ """
+ #########################################################################
+ # Overridables
+ #
+ # See PeriphManagerBase for documentation on these fields
+ #########################################################################
+ description = "E300-Series Device"
+ pids = {0xe320: 'e320'}
+ mboard_eeprom_addr = "e0004000.i2c"
+ mboard_eeprom_offset = 0
+ mboard_eeprom_max_len = 256
+ mboard_info = {"type": "e3xx",
+ "product": "e320"
+ }
+ mboard_max_rev = 2 # RevB
+ mboard_sensor_callback_map = {
+ # FIXME add sensors
+ }
+ max_num_dboards = 1
+ crossbar_base_port = 2 # It's 2 because 0,1 are SFP,DMA
+
+ # We're on a Zynq target, so the following two come from the Zynq standard
+ # device tree overlay (tree/arch/arm/boot/dts/zynq-7000.dtsi)
+ dboard_spimaster_addrs = ["e0006000.spi", "e0007000.spi"]
+ # E320-specific settings
+ # Label for the mboard UIO
+ mboard_regs_label = "mboard-regs"
+ # Override the list of updateable components
+ updateable_components = {
+ 'fpga': {
+ 'callback': "update_fpga",
+ 'path': '/lib/firmware/{}.bin',
+ 'reset': True,
+ },
+ 'dts': {
+ 'callback': "update_dts",
+ 'path': '/lib/firmware/{}.dts',
+ 'output': '/lib/firmware/{}.dtbo',
+ 'reset': False,
+ },
+ }
+
+ @staticmethod
+ def list_required_dt_overlays(device_info):
+ """
+ Lists device tree overlays that need to be applied before this class can
+ be used. List of strings.
+ Are applied in order.
+
+ eeprom_md -- Dictionary of info read out from the mboard EEPROM
+ device_args -- Arbitrary dictionary of info, typically user-defined
+ """
+ return [device_info['product']]
+
+ ###########################################################################
+ # Ctor and device initialization tasks
+ ###########################################################################
+ def __init__(self, args):
+ super(e320, self).__init__(args)
+ if not self._device_initialized:
+ # Don't try and figure out what's going on. Just give up.
+ return
+ self._tear_down = False
+ self._status_monitor_thread = None
+ self._ext_clock_freq = E320_DEFAULT_EXT_CLOCK_FREQ
+ self._clock_source = None
+ self._time_source = None
+ self._available_endpoints = list(range(256))
+ self.dboard = self.dboards[E320_DBOARD_SLOT_IDX]
+ try:
+ self._init_peripherals(args)
+ except Exception as ex:
+ self.log.error("Failed to initialize motherboard: %s", str(ex))
+ self._initialization_status = str(ex)
+ self._device_initialized = False
+
+ def _init_dboards(self, _, override_dboard_pids, default_args):
+ """
+ Initialize all the daughterboards
+
+ (dboard_infos) -- N/A
+ override_dboard_pids -- List of dboard PIDs to force
+ default_args -- Default args
+ """
+ # Override the base class's implementation in order to avoid initializing our one "dboard"
+ # in the same way that, for example, N310's dboards are initialized. Specifically,
+ # - skip dboard EEPROM setup (we don't have one)
+ # - change the way we handle SPI devices
+ if override_dboard_pids:
+ self.log.warning("Overriding daughterboard PIDs with: {}"
+ .format(override_dboard_pids))
+ raise NotImplementedError("Can't override dboard pids")
+ # The DBoard PID is the same as the MBoard PID
+ db_pid = list(self.pids.keys())[0]
+ # Set up the SPI nodes
+ spi_nodes = []
+ for spi_addr in self.dboard_spimaster_addrs:
+ for spi_node in get_spidev_nodes(spi_addr):
+ bisect.insort(spi_nodes, spi_node)
+ self.log.trace("Found spidev nodes: {0}".format(spi_nodes))
+
+ if not spi_nodes:
+ self.log.warning("No SPI nodes for dboard %d.", E320_DBOARD_SLOT_IDX)
+ dboard_info = {
+ 'eeprom_md': self.mboard_info,
+ 'eeprom_rawdata': self._eeprom_rawdata,
+ 'pid': db_pid,
+ 'spi_nodes': spi_nodes,
+ 'default_args': default_args,
+ }
+ # This will actually instantiate the dboard class:
+ self.dboards.append(Neon(E320_DBOARD_SLOT_IDX, **dboard_info))
+ self.log.info("Found %d daughterboard(s).", len(self.dboards))
+
+ def _check_fpga_compat(self):
+ " Throw an exception if the compat numbers don't match up "
+ actual_compat = self.mboard_regs_control.get_compat_number()
+ self.log.debug("Actual FPGA compat number: {:d}.{:d}".format(
+ actual_compat[0], actual_compat[1]
+ ))
+ assert_compat_number(
+ E320_FPGA_COMPAT,
+ self.mboard_regs_control.get_compat_number(),
+ component="FPGA",
+ fail_on_old_minor=True,
+ log=self.log
+ )
+
+ def _init_ref_clock_and_time(self, default_args):
+ """
+ Initialize clock and time sources. After this function returns, the
+ reference signals going to the FPGA are valid.
+ """
+ self._ext_clock_freq = float(
+ default_args.get('ext_clock_freq', E320_DEFAULT_EXT_CLOCK_FREQ)
+ )
+ if not self.dboards:
+ self.log.warning(
+ "No dboards found, skipping setting clock and time source "
+ "configuration."
+ )
+ self._clock_source = E320_DEFAULT_CLOCK_SOURCE
+ self._time_source = E320_DEFAULT_TIME_SOURCE
+ else:
+ self.set_clock_source(
+ default_args.get('clock_source', E320_DEFAULT_CLOCK_SOURCE)
+ )
+ self.set_time_source(
+ default_args.get('time_source', E320_DEFAULT_TIME_SOURCE)
+ )
+
+ def _monitor_status(self):
+ """
+ Status monitoring thread: This should be executed in a thread. It will
+ continuously monitor status of the following peripherals:
+
+ - GPS lock
+ """
+ self.log.trace("Launching monitor loop...")
+ cond = threading.Condition()
+ cond.acquire()
+ while not self._tear_down:
+ gps_locked = self.get_gps_lock_sensor()['value'] == 'true'
+ # Now wait
+ if cond.wait_for(
+ lambda: self._tear_down,
+ E320_MONITOR_THREAD_INTERVAL):
+ break
+ cond.release()
+ self.log.trace("Terminating monitor loop.")
+
+ def _init_peripherals(self, args):
+ """
+ Turn on all peripherals. This may throw an error on failure, so make
+ sure to catch it.
+
+ Peripherals are initialized in the order of least likely to fail, to most
+ likely.
+ """
+ # Sanity checks
+ assert self.mboard_info.get('product') in self.pids.values(), \
+ "Device product could not be determined!"
+ # Init Mboard Regs
+ self.mboard_regs_control = MboardRegsControl(
+ self.mboard_regs_label, self.log)
+ self.mboard_regs_control.get_git_hash()
+ self.mboard_regs_control.get_build_timestamp()
+ self._check_fpga_compat()
+ self._update_fpga_type()
+ # Init peripherals
+ self.enable_gps(
+ enable=str2bool(
+ args.get('enable_gps', E320_DEFAULT_ENABLE_GPS)
+ )
+ )
+ self.enable_fp_gpio(
+ voltage=args.get(
+ 'fp_gpio_voltage',
+ E320_DEFAULT_FPGPIO_VOLTAGE
+ )
+ )
+ # Init clocking
+ self._init_ref_clock_and_time(args)
+ # Init CHDR transports
+ self._xport_mgrs = {
+ 'udp': E320XportMgrUDP(self.log.getChild('UDP')),
+ 'liberio': E320XportMgrLiberio(self.log.getChild('liberio')),
+ }
+ # Spawn status monitoring thread
+ self.log.trace("Spawning status monitor thread...")
+ self._status_monitor_thread = threading.Thread(
+ target=self._monitor_status,
+ name="E320StatusMonitorThread",
+ daemon=True,
+ )
+ self._status_monitor_thread.start()
+ # Init complete.
+ self.log.debug("mboard info: {}".format(self.mboard_info))
+
+ ###########################################################################
+ # Session init and deinit
+ ###########################################################################
+ def init(self, args):
+ """
+ Calls init() on the parent class, and then programs the Ethernet
+ dispatchers accordingly.
+ """
+ if not self._device_initialized:
+ self.log.warning(
+ "Cannot run init(), device was never fully initialized!")
+ return False
+ if args.get("clock_source", "") != "":
+ self.set_clock_source(args.get("clock_source"))
+ if args.get("time_source", "") != "":
+ self.set_time_source(args.get("time_source"))
+ result = super(e320, self).init(args)
+ for xport_mgr in itervalues(self._xport_mgrs):
+ xport_mgr.init(args)
+ return result
+
+ def deinit(self):
+ """
+ Clean up after a UHD session terminates.
+ """
+ if not self._device_initialized:
+ self.log.warning(
+ "Cannot run deinit(), device was never fully initialized!")
+ return
+ super(e320, self).deinit()
+ for xport_mgr in itervalues(self._xport_mgrs):
+ xport_mgr.deinit()
+ self.log.trace("Resetting SID pool...")
+ self._available_endpoints = list(range(256))
+
+ def tear_down(self):
+ """
+ Tear down all members that need to be specially handled before
+ deconstruction.
+ For E320, this means the overlay.
+ """
+ self.log.trace("Tearing down E320 device...")
+ self._tear_down = True
+ if self._device_initialized:
+ self._status_monitor_thread.join(3 * E320_MONITOR_THREAD_INTERVAL)
+ if self._status_monitor_thread.is_alive():
+ self.log.error("Could not terminate monitor thread! This could result in resource leaks.")
+ active_overlays = self.list_active_overlays()
+ self.log.trace("E320 has active device tree overlays: {}".format(
+ active_overlays
+ ))
+ for overlay in active_overlays:
+ dtoverlay.rm_overlay(overlay)
+
+ ###########################################################################
+ # Transport API
+ ###########################################################################
+ def request_xport(
+ self,
+ dst_address,
+ suggested_src_address,
+ xport_type
+ ):
+ """
+ See PeriphManagerBase.request_xport() for docs.
+ """
+ # Try suggested address first, then just pick the first available one:
+ src_address = suggested_src_address
+ if src_address not in self._available_endpoints:
+ if not self._available_endpoints:
+ raise RuntimeError(
+ "Depleted pool of SID endpoints for this device!")
+ else:
+ src_address = self._available_endpoints[0]
+ sid = SID(src_address << 16 | dst_address)
+ # Note: This SID may change its source address!
+ self.log.trace(
+ "request_xport(dst=0x%04X, suggested_src_address=0x%04X, xport_type=%s): " \
+ "operating on temporary SID: %s",
+ dst_address, suggested_src_address, str(xport_type), str(sid))
+ # FIXME token!
+ assert self.mboard_info['rpc_connection'] in ('remote', 'local')
+ if self.mboard_info['rpc_connection'] == 'remote':
+ return self._xport_mgrs['udp'].request_xport(
+ sid,
+ xport_type,
+ )
+ elif self.mboard_info['rpc_connection'] == 'local':
+ return self._xport_mgrs['liberio'].request_xport(
+ sid,
+ xport_type,
+ )
+
+ def commit_xport(self, xport_info):
+ """
+ See PeriphManagerBase.commit_xport() for docs.
+
+ Reminder: All connections are incoming, i.e. "send" or "TX" means
+ remote device to local device, and "receive" or "RX" means this local
+ device to remote device. "Remote device" can be, for example, a UHD
+ session.
+ """
+ ## Go, go, go
+ assert self.mboard_info['rpc_connection'] in ('remote', 'local')
+ sid = SID(xport_info['send_sid'])
+ self._available_endpoints.remove(sid.src_ep)
+ self.log.debug("Committing transport for SID %s, xport info: %s",
+ str(sid), str(xport_info))
+ if self.mboard_info['rpc_connection'] == 'remote':
+ return self._xport_mgrs['udp'].commit_xport(sid, xport_info)
+ elif self.mboard_info['rpc_connection'] == 'local':
+ return self._xport_mgrs['liberio'].commit_xport(sid, xport_info)
+
+ ###########################################################################
+ # Device info
+ ###########################################################################
+ def get_device_info_dyn(self):
+ """
+ Append the device info with current IP addresses.
+ """
+ if not self._device_initialized:
+ return {}
+ device_info = self._xport_mgrs['udp'].get_xport_info()
+ device_info.update({
+ 'fpga_version': "{}.{}".format(
+ *self.mboard_regs_control.get_compat_number()),
+ 'fpga': self.updateable_components.get('fpga', {}).get('type',""),
+ })
+ return device_info
+
+ ###########################################################################
+ # Clock/Time API
+ ###########################################################################
+ def get_clock_sources(self):
+ " Lists all available clock sources. "
+ self.log.trace("Listing available clock sources...")
+ return ('external', 'internal', 'gpsdo')
+
+ def get_clock_source(self):
+ " Returns the currently selected clock source "
+ return self._clock_source
+
+ def set_clock_source(self, *args):
+ """
+ Switch reference clock.
+
+ Throws if clock_source is not a valid value.
+ """
+ clock_source = args[0]
+ assert clock_source in self.get_clock_sources()
+ self.log.debug("Setting clock source to `{}'".format(clock_source))
+ if clock_source == self.get_clock_source():
+ self.log.trace("Nothing to do -- clock source already set.")
+ return
+ self._clock_source = clock_source
+ ref_clk_freq = self.get_ref_clock_freq()
+ self.mboard_regs_control.set_clock_source(clock_source, ref_clk_freq)
+ self.log.debug("Reference clock frequency is: {} MHz".format(
+ ref_clk_freq/1e6
+ ))
+ self.dboard.update_ref_clock_freq(ref_clk_freq)
+
+ def set_ref_clock_freq(self, freq):
+ """
+ Tell our USRP what the frequency of the external reference clock is.
+
+ Will throw if it's not a valid value.
+ """
+ # Other frequencies have not been tested
+ assert freq in (10e6, 20e6)
+ self.log.debug("We've been told the external reference clock " \
+ "frequency is {} MHz.".format(freq / 1e6))
+ if self._ext_clock_freq == freq:
+ self.log.trace("New external reference clock frequency " \
+ "assignment matches previous assignment. Ignoring " \
+ "update command.")
+ return
+ self._ext_clock_freq = freq
+ if self.get_clock_source() == 'external':
+ for slot, dboard in enumerate(self.dboards):
+ if hasattr(dboard, 'update_ref_clock_freq'):
+ self.log.trace(
+ "Updating reference clock on dboard %d to %f MHz...",
+ slot, freq/1e6
+ )
+ dboard.update_ref_clock_freq(freq)
+
+
+ def get_ref_clock_freq(self):
+ " Returns the currently active reference clock frequency"
+ clock_source = self.get_clock_source()
+ if clock_source == "internal" or clock_source == "gpsdo":
+ return E320_DEFAULT_INT_CLOCK_FREQ
+ elif clock_source == "external":
+ return self._ext_clock_freq
+
+ def get_time_sources(self):
+ " Returns list of valid time sources "
+ return ['internal', 'external', 'gpsdo']
+
+ def get_time_source(self):
+ " Return the currently selected time source "
+ return self._time_source
+
+ def set_time_source(self, time_source):
+ " Set a time source "
+ assert time_source in self.get_time_sources()
+ if time_source == self.get_time_source():
+ self.log.trace("Nothing to do -- time source already set.")
+ return
+ self._time_source = time_source
+ self.mboard_regs_control.set_time_source(time_source, self.get_ref_clock_freq())
+
+ ###########################################################################
+ # Hardware peripheral controls
+ ###########################################################################
+
+ def set_fp_gpio_master(self, value):
+ """set driver for front panel GPIO
+ Arguments:
+ value {unsigned} -- value is a single bit bit mask of 12 pins GPIO
+ """
+ self.mboard_regs_control.set_fp_gpio_master(value)
+
+ def get_fp_gpio_master(self):
+ """get "who" is driving front panel gpio
+ The return value is a bit mask of 8 pins GPIO.
+ 0: means the pin is driven by PL
+ 1: means the pin is driven by PS
+ """
+ return self.mboard_regs_control.get_fp_gpio_master()
+
+ def set_fp_gpio_radio_src(self, value):
+ """set driver for front panel GPIO
+ Arguments:
+ value {unsigned} -- value is 2-bit bit mask of 8 pins GPIO
+ 00: means the pin is driven by radio 0
+ 01: means the pin is driven by radio 1
+ """
+ self.mboard_regs_control.set_fp_gpio_radio_src(value)
+
+ def get_fp_gpio_radio_src(self):
+ """get which radio is driving front panel gpio
+ The return value is 2-bit bit mask of 8 pins GPIO.
+ 00: means the pin is driven by radio 0
+ 01: means the pin is driven by radio 1
+ """
+ return self.mboard_regs_control.get_fp_gpio_radio_src()
+
+ def enable_gps(self, enable):
+ """
+ Turn power to the GPS (CLK_GPS_PWR_EN) off or on.
+ """
+ self.mboard_regs_control.enable_gps(enable)
+
+ def enable_fp_gpio(self, voltage):
+ """
+ Turn power to the front panel GPIO off or on and set voltage
+ to (1.8, 2.5, 3.3V) and setting to 0 turns off GPIO.
+ """
+ self.log.trace("{} power to front-panel GPIO".format(
+ "Enabling" if voltage == 0 else "Disabling"
+ ))
+ self.mboard_regs_control.enable_fp_gpio(voltage)
+
+ def set_fp_gpio_voltage(self, value):
+ """
+ Set Front Panel GPIO voltage (1.8, 2.5 or 3.3 Volts)
+ """
+ self.log.trace("Setting front-panel GPIO voltage to {:3.1f} V".format(value))
+ self.mboard_regs_control.set_fp_gpio_voltage(value)
+
+ def get_fp_gpio_voltage(self):
+ """
+ Get Front Panel GPIO voltage (1.8, 2.5 or 3.3 Volts)
+ """
+ value = self.mboard_regs_control.get_fp_gpio_voltage()
+ self.log.trace("Current front-panel GPIO voltage {:3.1f} V".format(value))
+ return value
+
+ def set_channel_mode(self, channel_mode):
+ "Set channel mode in FPGA and select which tx channel to use"
+ self.mboard_regs_control.set_channel_mode(channel_mode)
+
+ ###########################################################################
+ # Sensors
+ ###########################################################################
+ def get_temp_sensor(self):
+ """
+ Get temperature sensor reading of the E320.
+ """
+ # TODO: This is Catalina's temperature. Do we want to return a different temp?
+ return self.catalina.get_temperature()
+
+ def get_gps_lock_sensor(self):
+ """
+ Get lock status of GPS as a sensor dict
+ """
+ self.log.trace("Reading status GPS lock pin from port expander")
+ raise NotImplementedError("GPS lock not implemented")
+ # FIXME put it in a register
+ # TODO: implement get_gps_lock, splits up functionality
+ #gps_locked = bool(self._gpios.get("GPS-LOCKOK"))
+ #return {
+ # 'name': 'gps_lock',
+ # 'type': 'BOOLEAN',
+ # 'unit': 'locked' if gps_locked else 'unlocked',
+ # 'value': str(gps_locked).lower(),
+ #}
+
+ # TODO: Add other GPS sensors (time, TPV, SKY, etc.)
+ # TODO: Add all physical sensors we can
+
+ ###########################################################################
+ # EEPROMs
+ ###########################################################################
+ def get_mb_eeprom(self):
+ """
+ Return a dictionary with EEPROM contents.
+
+ All key/value pairs are string -> string.
+
+ We don't actually return the EEPROM contents, instead, we return the
+ mboard info again. This filters the EEPROM contents to what we think
+ the user wants to know/see.
+ """
+ return self.mboard_info
+
+ def set_mb_eeprom(self, eeprom_vals):
+ """
+ See PeriphManagerBase.set_mb_eeprom() for docs.
+ """
+ self.log.warn("Called set_mb_eeprom(), but not implemented!")
+ raise NotImplementedError
+
+ def get_db_eeprom(self, dboard_idx):
+ """
+ See PeriphManagerBase.get_db_eeprom() for docs.
+ """
+ if dboard_idx != E320_DBOARD_SLOT_IDX:
+ self.log.warn("Trying to access invalid dboard index {}. "
+ "Using the only dboard.".format(dboard_idx))
+ db_eeprom_data = copy.copy(self.dboard.device_info)
+ for blob_id, blob in iteritems(self.dboard.get_user_eeprom_data()):
+ if blob_id in db_eeprom_data:
+ self.log.warn("EEPROM user data contains invalid blob ID "
+ "%s", blob_id)
+ else:
+ db_eeprom_data[blob_id] = blob
+ return db_eeprom_data
+
+ def set_db_eeprom(self, dboard_idx, eeprom_data):
+ """
+ Write new EEPROM contents with eeprom_map.
+
+ Arguments:
+ dboard_idx -- Slot index of dboard (can only be E320_DBOARD_SLOT_IDX)
+ eeprom_data -- Dictionary of EEPROM data to be written. It's up to the
+ specific device implementation on how to handle it.
+ """
+ if dboard_idx != E320_DBOARD_SLOT_IDX:
+ self.log.warn("Trying to access invalid dboard index {}. "
+ "Using the only dboard.".format(dboard_idx))
+ safe_db_eeprom_user_data = {}
+ for blob_id, blob in iteritems(eeprom_data):
+ if blob_id in self.dboard.device_info:
+ error_msg = "Trying to overwrite read-only EEPROM " \
+ "entry `{}'!".format(blob_id)
+ self.log.error(error_msg)
+ raise RuntimeError(error_msg)
+ if not isinstance(blob, str) and not isinstance(blob, bytes):
+ error_msg = "Blob data for ID `{}' is not a " \
+ "string!".format(blob_id)
+ self.log.error(error_msg)
+ raise RuntimeError(error_msg)
+ assert isinstance(blob, str)
+ safe_db_eeprom_user_data[blob_id] = blob.encode('ascii')
+ self.dboard.set_user_eeprom_data(safe_db_eeprom_user_data)
+
+ ###########################################################################
+ # Component updating
+ ###########################################################################
+ # Note: Component updating functions defined by ZynqComponents
+ @no_rpc
+ def _update_fpga_type(self):
+ """Update the fpga type stored in the updateable components"""
+ fpga_type = self.mboard_regs_control.get_fpga_type()
+ self.log.debug("Updating mboard FPGA type info to {}".format(fpga_type))
+ self.updateable_components['fpga']['type'] = fpga_type