#
# Copyright 2017 Ettus Research (National Instruments)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
"""
N310 implementation module
"""
from __future__ import print_function
import os
import copy
import shutil
from six import iteritems
from builtins import object
from .base import PeriphManagerBase
from ..net import get_iface_addrs
from ..net import byte_to_mac
from ..net import get_mac_addr
from ..mpmtypes import SID
from usrp_mpm.uio import UIO
from usrp_mpm.rpc_server import no_claim, no_rpc
from ..sysfs_gpio import SysFSGPIO
from ..ethtable import EthDispatcherTable
from .. import libpyusrp_periphs as lib
N3XX_DEFAULT_EXT_CLOCK_FREQ = 10e6
N3XX_DEFAULT_CLOCK_SOURCE = 'external'
N3XX_DEFAULT_TIME_SOURCE = 'internal'
N3XX_DEFAULT_ENABLE_GPS = True
N3XX_DEFAULT_ENABLE_FPGPIO = True
class TCA6424(object):
"""
Abstraction layer for the port/gpio expander
pins_list is an array of different version of TCA6424 pins map.
First element of this array corresponding to revC, second is revD etc...
"""
pins_list = [
(
'PWREN-CLK-MGT156MHz',
'NETCLK-CE', #revC name: 'PWREN-CLK-WB-CDCM',
'NETCLK-RESETn', #revC name: 'WB-CDCM-RESETn',
'NETCLK-PR0', #revC name: 'WB-CDCM-PR0',
'NETCLK-PR1', #revC name: 'WB-CDCM-PR1',
'NETCLK-OD0', #revC name: 'WB-CDCM-OD0',
'NETCLK-OD1', #revC name: 'WB-CDCM-OD1',
'NETCLK-OD2', #revC name: 'WB-CDCM-OD2',
'PWREN-CLK-MAINREF',
'CLK-MAINSEL-25MHz', #revC name: 'CLK-MAINREF-SEL1',
'CLK-MAINSEL-EX_B', #revC name: 'CLK-MAINREF-SEL0',
'12',
'CLK-MAINSEL-GPS', #revC name: '13',
'FPGA-GPIO-EN',
'PWREN-CLK-WB-20MHz',
'PWREN-CLK-WB-25MHz',
'GPS-PHASELOCK',
'GPS-nINITSURV',
'GPS-nRESET',
'GPS-WARMUP',
'GPS-SURVEY',
'GPS-LOCKOK',
'GPS-ALARM',
'PWREN-GPS',
),
(
'NETCLK-PR1',
'NETCLK-PR0',
'NETCLK-CE',
'NETCLK-RESETn',
'NETCLK-OD2',
'NETCLK-OD1',
'NETCLK-OD0',
'PWREN-CLK-MGT156MHz',
'PWREN-CLK-MAINREF',
'CLK-MAINSEL-25MHz',
'CLK-MAINSEL-EX_B',
'12',
'CLK-MAINSEL-GPS',
'FPGA-GPIO-EN',
'PWREN-CLK-WB-20MHz',
'PWREN-CLK-WB-25MHz',
'GPS-PHASELOCK',
'GPS-nINITSURV',
'GPS-nRESET',
'GPS-WARMUP',
'GPS-SURVEY',
'GPS-LOCKOK',
'GPS-ALARM',
'PWREN-GPS',
)]
def __init__(self, rev):
# Default state: Turn on GPS power, take GPS out of reset or
# init-survey, turn on 156.25 MHz clock
# min Support from revC or rev = 2
if rev == 2:
self.pins = self.pins_list[0]
else:
self.pins = self.pins_list[1]
default_val = 0x860101 if rev == 2 else 0x860780
self._gpios = SysFSGPIO('tca6424', 0xFFF7FF, 0x86F7FF, default_val)
def set(self, name, value=None):
"""
Assert a pin by name
"""
assert name in self.pins
self._gpios.set(self.pins.index(name), value=value)
def reset(self, name):
"""
Deassert a pin by name
"""
self.set(name, value=0)
def get(self, name):
"""
Read back a pin by name
"""
assert name in self.pins
return self._gpios.get(self.pins.index(name))
class FP_GPIO(object):
"""
Abstraction layer for the front panel GPIO
"""
EMIO_BASE = 54
FP_GPIO_OFFSET = 32 # Bit offset within the ps_gpio_* pins
def __init__(self, ddr):
self._gpiosize = 12
self._offset = self.FP_GPIO_OFFSET + self.EMIO_BASE
self.usemask = 0xFFF
self.ddr = ddr
self._gpios = SysFSGPIO(
'zynq_gpio',
self.usemask< string.
We don't actually return the EEPROM contents, instead, we return the
mboard info again. This filters the EEPROM contents to what we think
the user wants to know/see.
"""
return self.mboard_info
def set_mb_eeprom(self, eeprom_vals):
"""
See PeriphManagerBase.set_mb_eeprom() for docs.
"""
self.log.warn("Called set_mb_eeprom(), but not implemented!")
raise NotImplementedError
def get_db_eeprom(self, dboard_idx):
"""
See PeriphManagerBase.get_db_eeprom() for docs.
"""
try:
dboard = self.dboards[dboard_idx]
except KeyError:
error_msg = "Attempted to access invalid dboard index `{}' " \
"in get_db_eeprom()!".format(dboard_idx)
self.log.error(error_msg)
raise RuntimeError(error_msg)
db_eeprom_data = copy.copy(dboard.device_info)
if hasattr(dboard, 'get_user_eeprom_data') and \
callable(dboard.get_user_eeprom_data):
for blob_id, blob in iteritems(dboard.get_user_eeprom_data()):
if blob_id in db_eeprom_data:
self.log.warn("EEPROM user data contains invalid blob ID " \
"%s", blob_id)
else:
db_eeprom_data[blob_id] = blob
return db_eeprom_data
def set_db_eeprom(self, dboard_idx, eeprom_data):
"""
Write new EEPROM contents with eeprom_map.
Arguments:
dboard_idx -- Slot index of dboard
eeprom_data -- Dictionary of EEPROM data to be written. It's up to the
specific device implementation on how to handle it.
"""
try:
dboard = self.dboards[dboard_idx]
except KeyError:
error_msg = "Attempted to access invalid dboard index `{}' " \
"in set_db_eeprom()!".format(dboard_idx)
self.log.error(error_msg)
raise RuntimeError(error_msg)
if not hasattr(dboard, 'set_user_eeprom_data') or \
not callable(dboard.set_user_eeprom_data):
error_msg = "Dboard has no set_user_eeprom_data() method!"
self.log.error(error_msg)
raise RuntimeError(error_msg)
safe_db_eeprom_user_data = {}
for blob_id, blob in iteritems(eeprom_data):
if blob_id in dboard.device_info:
error_msg = "Trying to overwrite read-only EEPROM " \
"entry `{}'!".format(blob_id)
self.log.error(error_msg)
raise RuntimeError(error_msg)
if not isinstance(blob, str) and not isinstance(blob, bytes):
error_msg = "Blob data for ID `{}' is not a " \
"string!".format(blob_id)
self.log.error(error_msg)
raise RuntimeError(error_msg)
assert isinstance(blob, str)
safe_db_eeprom_user_data[blob_id] = blob.encode('ascii')
dboard.set_user_eeprom_data(safe_db_eeprom_user_data)
@no_rpc
def update_fpga(self, filepath, metadata):
"""
Update the FPGA image in the filesystem and reload the overlay
:param filepath: path to new FPGA image
:param metadata: Dictionary of strings containing metadata
"""
self.log.trace("Updating FPGA with image at {}"
.format(filepath))
_, file_extension = os.path.splitext(filepath)
# Cut off the period from the file extension
file_extension = file_extension[1:].lower()
if file_extension == "bit":
self.log.trace("Converting bit to bin file and writing to {}"
.format(self.binfile_path))
from usrp_mpm.fpga_bit_to_bin import fpga_bit_to_bin
fpga_bit_to_bin(filepath, self.binfile_path, flip=True)
elif file_extension == "bin":
self.log.trace("Copying bin file to {}"
.format(self.binfile_path))
shutil.copy(filepath, self.binfile_path)
else:
self.log.error("Invalid FPGA bitfile: {}"
.format(filepath))
raise RuntimeError("Invalid N310 FPGA bitfile")
# TODO: Implement reload procedure
return True