#
# Copyright 2017 Ettus Research (National Instruments)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
"""
N310 implementation module
"""
from __future__ import print_function
import os
import copy
import shutil
import subprocess
import json
from six import iteritems, itervalues
from builtins import object
from usrp_mpm.gpsd_iface import GPSDIface
from usrp_mpm.periph_manager import PeriphManagerBase
from usrp_mpm.mpmtypes import SID
from usrp_mpm.rpc_server import no_rpc
from usrp_mpm.sys_utils import dtoverlay
from usrp_mpm.sys_utils.sysfs_gpio import SysFSGPIO
from usrp_mpm.sys_utils.uio import UIO
from usrp_mpm.xports import XportMgrUDP, XportMgrLiberio
N3XX_DEFAULT_EXT_CLOCK_FREQ = 10e6
N3XX_DEFAULT_CLOCK_SOURCE = 'external'
N3XX_DEFAULT_TIME_SOURCE = 'internal'
N3XX_DEFAULT_ENABLE_GPS = True
N3XX_DEFAULT_ENABLE_FPGPIO = True
###############################################################################
# Additional peripheral controllers specific to Magnesium
###############################################################################
class TCA6424(object):
"""
Abstraction layer for the port/gpio expander
pins_list is an array of different version of TCA6424 pins map.
First element of this array corresponding to revC, second is revD etc...
"""
pins_list = [
(
'PWREN-CLK-MGT156MHz',
'NETCLK-CE', #revC name: 'PWREN-CLK-WB-CDCM',
'NETCLK-RESETn', #revC name: 'WB-CDCM-RESETn',
'NETCLK-PR0', #revC name: 'WB-CDCM-PR0',
'NETCLK-PR1', #revC name: 'WB-CDCM-PR1',
'NETCLK-OD0', #revC name: 'WB-CDCM-OD0',
'NETCLK-OD1', #revC name: 'WB-CDCM-OD1',
'NETCLK-OD2', #revC name: 'WB-CDCM-OD2',
'PWREN-CLK-MAINREF',
'CLK-MAINSEL-25MHz', #revC name: 'CLK-MAINREF-SEL1',
'CLK-MAINSEL-EX_B', #revC name: 'CLK-MAINREF-SEL0',
'12',
'CLK-MAINSEL-GPS', #revC name: '13',
'FPGA-GPIO-EN',
'PWREN-CLK-WB-20MHz',
'PWREN-CLK-WB-25MHz',
'GPS-PHASELOCK',
'GPS-nINITSURV',
'GPS-nRESET',
'GPS-WARMUP',
'GPS-SURVEY',
'GPS-LOCKOK',
'GPS-ALARM',
'PWREN-GPS',
),
(
'NETCLK-PR1',
'NETCLK-PR0',
'NETCLK-CE',
'NETCLK-RESETn',
'NETCLK-OD2',
'NETCLK-OD1',
'NETCLK-OD0',
'PWREN-CLK-MGT156MHz',
'PWREN-CLK-MAINREF',
'CLK-MAINSEL-25MHz',
'CLK-MAINSEL-EX_B',
'12',
'CLK-MAINSEL-GPS',
'FPGA-GPIO-EN',
'PWREN-CLK-WB-20MHz',
'PWREN-CLK-WB-25MHz',
'GPS-PHASELOCK',
'GPS-nINITSURV',
'GPS-nRESET',
'GPS-WARMUP',
'GPS-SURVEY',
'GPS-LOCKOK',
'GPS-ALARM',
'PWREN-GPS',
)]
def __init__(self, rev):
# Default state: Turn on GPS power, take GPS out of reset or
# init-survey, turn on 156.25 MHz clock
# min Support from revC or rev = 2
if rev == 2:
self.pins = self.pins_list[0]
else:
self.pins = self.pins_list[1]
default_val = 0x860101 if rev == 2 else 0x860780
self._gpios = SysFSGPIO('tca6424', 0xFFF7FF, 0x86F7FF, default_val)
def set(self, name, value=None):
"""
Assert a pin by name
"""
assert name in self.pins
self._gpios.set(self.pins.index(name), value=value)
def reset(self, name):
"""
Deassert a pin by name
"""
self.set(name, value=0)
def get(self, name):
"""
Read back a pin by name
"""
assert name in self.pins
return self._gpios.get(self.pins.index(name))
class FP_GPIO(object):
"""
Abstraction layer for the front panel GPIO
"""
EMIO_BASE = 54
FP_GPIO_OFFSET = 32 # Bit offset within the ps_gpio_* pins
def __init__(self, ddr):
self._gpiosize = 12
self._offset = self.FP_GPIO_OFFSET + self.EMIO_BASE
self.usemask = 0xFFF
self.ddr = ddr
self._gpios = SysFSGPIO(
'zynq_gpio',
self.usemask< string.
We don't actually return the EEPROM contents, instead, we return the
mboard info again. This filters the EEPROM contents to what we think
the user wants to know/see.
"""
return self.mboard_info
def set_mb_eeprom(self, eeprom_vals):
"""
See PeriphManagerBase.set_mb_eeprom() for docs.
"""
self.log.warn("Called set_mb_eeprom(), but not implemented!")
raise NotImplementedError
def get_db_eeprom(self, dboard_idx):
"""
See PeriphManagerBase.get_db_eeprom() for docs.
"""
try:
dboard = self.dboards[dboard_idx]
except KeyError:
error_msg = "Attempted to access invalid dboard index `{}' " \
"in get_db_eeprom()!".format(dboard_idx)
self.log.error(error_msg)
raise RuntimeError(error_msg)
db_eeprom_data = copy.copy(dboard.device_info)
if hasattr(dboard, 'get_user_eeprom_data') and \
callable(dboard.get_user_eeprom_data):
for blob_id, blob in iteritems(dboard.get_user_eeprom_data()):
if blob_id in db_eeprom_data:
self.log.warn("EEPROM user data contains invalid blob ID " \
"%s", blob_id)
else:
db_eeprom_data[blob_id] = blob
return db_eeprom_data
def set_db_eeprom(self, dboard_idx, eeprom_data):
"""
Write new EEPROM contents with eeprom_map.
Arguments:
dboard_idx -- Slot index of dboard
eeprom_data -- Dictionary of EEPROM data to be written. It's up to the
specific device implementation on how to handle it.
"""
try:
dboard = self.dboards[dboard_idx]
except KeyError:
error_msg = "Attempted to access invalid dboard index `{}' " \
"in set_db_eeprom()!".format(dboard_idx)
self.log.error(error_msg)
raise RuntimeError(error_msg)
if not hasattr(dboard, 'set_user_eeprom_data') or \
not callable(dboard.set_user_eeprom_data):
error_msg = "Dboard has no set_user_eeprom_data() method!"
self.log.error(error_msg)
raise RuntimeError(error_msg)
safe_db_eeprom_user_data = {}
for blob_id, blob in iteritems(eeprom_data):
if blob_id in dboard.device_info:
error_msg = "Trying to overwrite read-only EEPROM " \
"entry `{}'!".format(blob_id)
self.log.error(error_msg)
raise RuntimeError(error_msg)
if not isinstance(blob, str) and not isinstance(blob, bytes):
error_msg = "Blob data for ID `{}' is not a " \
"string!".format(blob_id)
self.log.error(error_msg)
raise RuntimeError(error_msg)
assert isinstance(blob, str)
safe_db_eeprom_user_data[blob_id] = blob.encode('ascii')
dboard.set_user_eeprom_data(safe_db_eeprom_user_data)
@no_rpc
def update_fpga(self, filepath, metadata):
"""
Update the FPGA image in the filesystem and reload the overlay
:param filepath: path to new FPGA image
:param metadata: Dictionary of strings containing metadata
"""
self.log.trace("Updating FPGA with image at {} (metadata: `{}')"
.format(filepath, str(metadata)))
_, file_extension = os.path.splitext(filepath)
# Cut off the period from the file extension
file_extension = file_extension[1:].lower()
binfile_path = self.updateable_components['fpga']['path']
if file_extension == "bit":
self.log.trace("Converting bit to bin file and writing to {}"
.format(binfile_path))
from usrp_mpm.fpga_bit_to_bin import fpga_bit_to_bin
fpga_bit_to_bin(filepath, binfile_path, flip=True)
elif file_extension == "bin":
self.log.trace("Copying bin file to {}"
.format(binfile_path))
shutil.copy(filepath, binfile_path)
else:
self.log.error("Invalid FPGA bitfile: {}"
.format(filepath))
raise RuntimeError("Invalid N310 FPGA bitfile")
# RPC server will reload the periph manager after this.
return True
@no_rpc
def update_dts(self, filepath, metadata):
"""
Update the DTS image in the filesystem
:param filepath: path to new DTS image
:param metadata: Dictionary of strings containing metadata
"""
dtsfile_path = self.updateable_components['dts']['path']
self.log.trace("Updating DTS with image at %s to %s (metadata: %s)",
filepath, dtsfile_path, str(metadata))
shutil.copy(filepath, dtsfile_path)
dtbofile_path = self.updateable_components['dts']['output']
self.log.trace("Compiling to %s...", dtbofile_path)
dtc_command = [
'dtc',
'--symbols',
'-O', 'dtb',
'-q', # Suppress warnings
'-o',
dtbofile_path,
dtsfile_path,
]
self.log.trace("Executing command: `$ %s'", " ".join(dtc_command))
try:
out = subprocess.check_output(dtc_command)
if out.strip() != "":
self.log.debug("`dtc' command output: \n%s", out)
except OSError as ex:
self.log.error("Could not execute `dtc' command. Binary probably "\
"not installed. Please compile DTS by hand.")
# No fatal error here, in order not to break the current workflow
except subprocess.CalledProcessError as ex:
self.log.error("Error executing `dtc': %s", str(ex))
return False
return True
class MboardRegsControl(object):
"""
Control the FPGA Motherboard registers
"""
# Motherboard registers
MB_DESIGN_REV = 0x0000
MB_DATESTAMP = 0x0004
MB_GIT_HASH = 0x0008
MB_BUS_COUNTER = 0x00C
MB_NUM_CE = 0x0010
MB_SCRATCH = 0x0014
MB_CLOCK_CTRL = 0x0018
# Bitfield locations for the MB_CLOCK_CTRL register.
MB_CLOCK_CTRL_PPS_SEL_INT_10 = 0 # pps_sel is one-hot encoded!
MB_CLOCK_CTRL_PPS_SEL_INT_25 = 1
MB_CLOCK_CTRL_PPS_SEL_EXT = 2
MB_CLOCK_CTRL_PPS_SEL_GPSDO = 3
MB_CLOCK_CTRL_PPS_OUT_EN = 4 # output enabled = 1
MB_CLOCK_CTRL_MEAS_CLK_RESET = 12 # set to 1 to reset mmcm, default is 0
MB_CLOCK_CTRL_MEAS_CLK_LOCKED = 13 # locked indication for meas_clk mmcm
def __init__(self, regs, log):
self.log = log
self.regs = regs
self.poke32 = self.regs.poke32
self.peek32 = self.regs.peek32
def get_git_hash(self):
"""
Returns the GIT hash for the FPGA build.
"""
git_hash = self.peek32(self.MB_GIT_HASH)
self.log.trace("FPGA build GIT Hash: 0x{:08X}".format(git_hash))
return git_hash
def set_time_source(self, time_source, ref_clk_freq):
"""
Set time source
"""
pps_sel_val = 0x0
if time_source == 'internal':
assert ref_clk_freq in (10e6, 25e6)
if ref_clk_freq == 10e6:
self.log.trace("Setting time source to internal (10 MHz reference)...")
pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_INT_10
elif ref_clk_freq == 25e6:
self.log.trace("Setting time source to internal (25 MHz reference)...")
pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_INT_25
elif time_source == 'external':
self.log.trace("Setting time source to external...")
pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_EXT
elif time_source == 'gpsdo':
self.log.trace("Setting time source to gpsdo...")
pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_GPSDO
else:
assert False
reg_val = self.peek32(self.MB_CLOCK_CTRL) & 0xFFFFFFF0; # clear lowest nibble
reg_val = reg_val | (pps_sel_val & 0xF) # set lowest nibble
self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
self.poke32(self.MB_CLOCK_CTRL, reg_val)
def enable_pps_out(self, enable):
"""
Enables the PPS/Trig output on the back panel
"""
self.log.trace("%s PPS/Trig output!", "Enabling" if enable else "Disabling")
mask = 0xFFFFFFFF ^ (0b1 << self.MB_CLOCK_CTRL_PPS_OUT_EN)
reg_val = self.peek32(self.MB_CLOCK_CTRL) & mask # mask the bit to clear it
if enable:
reg_val = reg_val | (0b1 << self.MB_CLOCK_CTRL_PPS_OUT_EN) # set the bit if desired
self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
self.poke32(self.MB_CLOCK_CTRL, reg_val)
def reset_meas_clk_mmcm(self, reset=True):
"""
Reset or unreset the MMCM for the measurement clock in the FPGA TDC.
"""
self.log.trace("%s measurement clock MMCM reset...", "Asserting" if reset else "Clearing")
mask = 0xFFFFFFFF ^ (0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_RESET)
reg_val = self.peek32(self.MB_CLOCK_CTRL) & mask # mask the bit to clear it
if reset:
reg_val = reg_val | (0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_RESET) # set the bit if desired
self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
self.poke32(self.MB_CLOCK_CTRL, reg_val)
def get_meas_clock_mmcm_lock(self):
"""
Check the status of the MMCM for the measurement clock in the FPGA TDC.
"""
mask = 0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_LOCKED
reg_val = self.peek32(self.MB_CLOCK_CTRL)
locked = (reg_val & mask) > 0
if not locked:
self.log.warning("Measurement clock MMCM reporting unlocked. MB_CLOCK_CTRL "
"reg: 0x{:08X}".format(reg_val))
else:
self.log.trace("Measurement clock MMCM locked!")
return locked