aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm')
-rw-r--r--mpm/python/usrp_mpm/periph_manager/n310.py412
-rw-r--r--mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py66
2 files changed, 223 insertions, 255 deletions
diff --git a/mpm/python/usrp_mpm/periph_manager/n310.py b/mpm/python/usrp_mpm/periph_manager/n310.py
index f50e0b778..91389ffc5 100644
--- a/mpm/python/usrp_mpm/periph_manager/n310.py
+++ b/mpm/python/usrp_mpm/periph_manager/n310.py
@@ -24,7 +24,7 @@ from usrp_mpm.mpmtypes import SID
from usrp_mpm.mpmutils import assert_compat_number
from usrp_mpm.rpc_server import no_rpc
from usrp_mpm.sys_utils import dtoverlay
-from usrp_mpm.sys_utils.sysfs_gpio import SysFSGPIO
+from usrp_mpm.sys_utils.sysfs_gpio import SysFSGPIO, GPIOBank
from usrp_mpm.sys_utils.uio import UIO
from usrp_mpm.sys_utils.sysfs_thermal import read_thermal_sensor_value
from usrp_mpm.xports import XportMgrUDP, XportMgrLiberio
@@ -133,7 +133,7 @@ class TCA6424(object):
return self._gpios.get(self.pins.index(name))
-class FP_GPIO(object):
+class FrontpanelGPIO(GPIOBank):
"""
Abstraction layer for the front panel GPIO
"""
@@ -141,68 +141,15 @@ class FP_GPIO(object):
FP_GPIO_OFFSET = 32 # Bit offset within the ps_gpio_* pins
def __init__(self, ddr):
- self._gpiosize = 12
- self._offset = self.FP_GPIO_OFFSET + self.EMIO_BASE
- self.usemask = 0xFFF
- self.ddr = ddr
- self._gpios = SysFSGPIO(
+ GPIOBank.__init__(
+ self,
'zynq_gpio',
- self.usemask<<self._offset,
- self.ddr<<self._offset
+ self.FP_GPIO_OFFSET + self.EMIO_BASE,
+ 0xFFF, # use_mask
+ ddr
)
- def set_all(self, value):
- """
- Assert all pin to the value.
- This method will convert value into binary and take the first 6 bits
- assign to 6pins.
- """
- bin_value = '{0:06b}'.format(value)
- wr_value = bin_value[-(self._gpiosize):]
- for i in range(self._gpiosize):
- if (1<<i)&self.ddr:
- self._gpios.set(self._offset+i, wr_value[i%6])
-
- def set(self, index, value=None):
- """
- Assert a pin by index
- """
- assert index in range(self._gpiosize)
- self._gpios.set(self._offset+index, value)
-
- def reset_all(self):
- """
- Deassert all pins
- """
- for i in range(self._gpiosize):
- self._gpios.reset(self._offset+i)
-
- def reset(self, index):
- """
- Deassert a pin by index
- """
- assert index in range(self._gpiosize)
- self._gpios.reset(self._offset+index)
-
- def get_all(self):
- """
- Read back all pins
- """
- result = 0
- for i in range(self._gpiosize):
- if not (1<<i)&self.ddr:
- value = self._gpios.get(self._offset+i)
- result = (result<<1) | value
- return result
-
- def get(self, index):
- """
- Read back a pin by index
- """
- assert index in range(self._gpiosize)
- return self._gpios.get(self._offset+index)
-
-class BackpanelGPIO(object):
+class BackpanelGPIO(GPIOBank):
"""
Abstraction layer for the back panel GPIO
"""
@@ -213,66 +160,173 @@ class BackpanelGPIO(object):
LED_GPS = 2
def __init__(self):
- self._gpiosize = 3
- self._offset = self.BP_GPIO_OFFSET + self.EMIO_BASE
- self.ddr = 0x7
- self.usemask = 0x7
- self._gpios = SysFSGPIO(
+ GPIOBank.__init__(
+ self,
'zynq_gpio',
- self.usemask << self._offset,
- self.ddr << self._offset
+ self.BP_GPIO_OFFSET + self.EMIO_BASE,
+ 0x7, # use_mask
+ 0x7, # ddr
)
- def set_all(self, value):
+class MboardRegsControl(object):
+ """
+ Control the FPGA Motherboard registers
+ """
+ # Motherboard registers
+ M_COMPAT_NUM = 0x0000
+ MB_DATESTAMP = 0x0004
+ MB_GIT_HASH = 0x0008
+ MB_SCRATCH = 0x000C
+ MB_NUM_CE = 0x0010
+ MB_NUM_IO_CE = 0x0014
+ MB_CLOCK_CTRL = 0x0018
+ MB_XADC_RB = 0x001C
+ MB_BUS_CLK_RATE = 0x0020
+ MB_BUS_COUNTER = 0x0024
+
+ # Bitfield locations for the MB_CLOCK_CTRL register.
+ MB_CLOCK_CTRL_PPS_SEL_INT_10 = 0 # pps_sel is one-hot encoded!
+ MB_CLOCK_CTRL_PPS_SEL_INT_25 = 1
+ MB_CLOCK_CTRL_PPS_SEL_EXT = 2
+ MB_CLOCK_CTRL_PPS_SEL_GPSDO = 3
+ MB_CLOCK_CTRL_PPS_OUT_EN = 4 # output enabled = 1
+ MB_CLOCK_CTRL_MEAS_CLK_RESET = 12 # set to 1 to reset mmcm, default is 0
+ MB_CLOCK_CTRL_MEAS_CLK_LOCKED = 13 # locked indication for meas_clk mmcm
+
+ def __init__(self, label, log):
+ self.log = log
+ self.regs = UIO(
+ label=label,
+ read_only=False
+ )
+ self.poke32 = self.regs.poke32
+ self.peek32 = self.regs.peek32
+
+ def get_compat_number(self):
+ """get FPGA compat number
+
+ This function reads back FPGA compat number.
+ The return is a tuple of
+ 2 numbers: (major compat number, minor compat number )
"""
- Set all pins to 'value'.
- This method will convert value into binary and take the first 3 bits
- assign to 3pins.
+ with self.regs.open():
+ compat_number = self.peek32(self.M_COMPAT_NUM)
+ minor = compat_number & 0xff
+ major = (compat_number>>16) & 0xff
+ return (major, minor)
+
+ def get_build_timestamp(self):
+ """
+ Returns the build date/time for the FPGA image.
+ The return is datetime string with the ISO 8601 format
+ (YYYY-MM-DD HH:MM:SS.mmmmmm)
"""
- bin_value = '{0:03b}'.format(value)
- wr_value = bin_value[-(self._gpiosize):]
- for i in range(self._gpiosize):
- if (1 << i) & self.ddr:
- self._gpios.set(self._offset + i, wr_value[i % 3])
+ with self.regs.open():
+ datestamp_rb = self.peek32(self.MB_DATESTAMP)
+ if datestamp_rb > 0:
+ dt_str = datetime.datetime(
+ year=((datestamp_rb>>17)&0x3F)+2000,
+ month=(datestamp_rb>>23)&0x0F,
+ day=(datestamp_rb>>27)&0x1F,
+ hour=(datestamp_rb>>12)&0x1F,
+ minute=(datestamp_rb>>6)&0x3F,
+ second=((datestamp_rb>>0)&0x3F))
+ self.log.trace("FPGA build timestamp: {}".format(str(dt_str)))
+ return str(dt_str)
+ else:
+ # Compatibility with FPGAs without datestamp capability
+ return ''
- def set(self, index, value=None):
+ def get_git_hash(self):
"""
- Set a pin by index
+ Returns the GIT hash for the FPGA build.
+ The return is a tuple of
+ 2 numbers: (short git hash, bool: is the tree dirty?)
"""
- assert index in range(self._gpiosize)
- self._gpios.set(self._offset + index, value)
+ with self.regs.open():
+ git_hash_rb = self.peek32(self.MB_GIT_HASH)
+ git_hash = git_hash_rb & 0x0FFFFFFF
+ tree_dirty = ((git_hash_rb & 0xF0000000) > 0)
+ dirtiness_qualifier = 'dirty' if tree_dirty else 'clean'
+ self.log.trace("FPGA build GIT Hash: {:07x} ({})".format(
+ git_hash, dirtiness_qualifier))
+ return (git_hash, dirtiness_qualifier)
- def reset_all(self):
+ def set_time_source(self, time_source, ref_clk_freq):
"""
- Clear all pins
+ Set time source
"""
- for i in range(self._gpiosize):
- self._gpios.reset(self._offset+i)
+ pps_sel_val = 0x0
+ if time_source == 'internal':
+ assert ref_clk_freq in (10e6, 25e6)
+ if ref_clk_freq == 10e6:
+ self.log.trace("Setting time source to internal "
+ "(10 MHz reference)...")
+ pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_INT_10
+ elif ref_clk_freq == 25e6:
+ self.log.trace("Setting time source to internal "
+ "(25 MHz reference)...")
+ pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_INT_25
+ elif time_source == 'external':
+ self.log.trace("Setting time source to external...")
+ pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_EXT
+ elif time_source == 'gpsdo':
+ self.log.trace("Setting time source to gpsdo...")
+ pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_GPSDO
+ else:
+ assert False
+ with self.regs.open():
+ reg_val = self.peek32(self.MB_CLOCK_CTRL) & 0xFFFFFFF0
+ reg_val = reg_val | (pps_sel_val & 0xF)
+ self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
+ self.poke32(self.MB_CLOCK_CTRL, reg_val)
- def reset(self, index):
+ def enable_pps_out(self, enable):
"""
- Clear a pin by index
+ Enables the PPS/Trig output on the back panel
"""
- assert index in range(self._gpiosize)
- self._gpios.reset(self._offset + index)
+ self.log.trace("%s PPS/Trig output!",
+ "Enabling" if enable else "Disabling")
+ mask = 0xFFFFFFFF ^ (0b1 << self.MB_CLOCK_CTRL_PPS_OUT_EN)
+ with self.regs.open():
+ # mask the bit to clear it:
+ reg_val = self.peek32(self.MB_CLOCK_CTRL) & mask
+ if enable:
+ # set the bit if desired:
+ reg_val = reg_val | (0b1 << self.MB_CLOCK_CTRL_PPS_OUT_EN)
+ self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
+ self.poke32(self.MB_CLOCK_CTRL, reg_val)
- def get_all(self):
+ def reset_meas_clk_mmcm(self, reset=True):
"""
- Read back all pins
+ Reset or unreset the MMCM for the measurement clock in the FPGA TDC.
"""
- result = 0
- for i in range(self._gpiosize):
- if not (1<<i)&self.ddr:
- value = self._gpios.get(self._offset + i)
- result = (result << 1) | value
- return result
+ self.log.trace("%s measurement clock MMCM reset...",
+ "Asserting" if reset else "Clearing")
+ mask = 0xFFFFFFFF ^ (0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_RESET)
+ with self.regs.open():
+ # mask the bit to clear it
+ reg_val = self.peek32(self.MB_CLOCK_CTRL) & mask
+ if reset:
+ # set the bit if desired
+ reg_val = reg_val | (0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_RESET)
+ self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
+ self.poke32(self.MB_CLOCK_CTRL, reg_val)
- def get(self, index):
+ def get_meas_clock_mmcm_lock(self):
"""
- Read back a pin by index
+ Check the status of the MMCM for the measurement clock in the FPGA TDC.
"""
- assert index in range(self._gpiosize)
- return self._gpios.get(self._offset + index)
+ mask = 0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_LOCKED
+ with self.regs.open():
+ reg_val = self.peek32(self.MB_CLOCK_CTRL)
+ locked = (reg_val & mask) > 0
+ if not locked:
+ self.log.warning("Measurement clock MMCM reporting unlocked. "
+ "MB_CLOCK_CTRL reg: 0x{:08X}".format(reg_val))
+ else:
+ self.log.trace("Measurement clock MMCM locked!")
+ return locked
###############################################################################
@@ -1065,7 +1119,7 @@ class n310(PeriphManagerBase):
run any actions on claiming (e.g., light up an LED).
"""
# Light up LINK
- self._bp_leds.set(self._bp_leds.LED_LINK, 1);
+ self._bp_leds.set(self._bp_leds.LED_LINK, 1)
def unclaim(self):
"""
@@ -1073,157 +1127,5 @@ class n310(PeriphManagerBase):
to run any actions on claiming (e.g., turn off an LED).
"""
# Turn off LINK
- self._bp_leds.set(self._bp_leds.LED_LINK, 0);
-
-class MboardRegsControl(object):
- """
- Control the FPGA Motherboard registers
- """
- # Motherboard registers
- M_COMPAT_NUM = 0x0000
- MB_DATESTAMP = 0x0004
- MB_GIT_HASH = 0x0008
- MB_SCRATCH = 0x000C
- MB_NUM_CE = 0x0010
- MB_NUM_IO_CE = 0x0014
- MB_CLOCK_CTRL = 0x0018
- MB_XADC_RB = 0x001C
- MB_BUS_CLK_RATE = 0x0020
- MB_BUS_COUNTER = 0x0024
-
- # Bitfield locations for the MB_CLOCK_CTRL register.
- MB_CLOCK_CTRL_PPS_SEL_INT_10 = 0 # pps_sel is one-hot encoded!
- MB_CLOCK_CTRL_PPS_SEL_INT_25 = 1
- MB_CLOCK_CTRL_PPS_SEL_EXT = 2
- MB_CLOCK_CTRL_PPS_SEL_GPSDO = 3
- MB_CLOCK_CTRL_PPS_OUT_EN = 4 # output enabled = 1
- MB_CLOCK_CTRL_MEAS_CLK_RESET = 12 # set to 1 to reset mmcm, default is 0
- MB_CLOCK_CTRL_MEAS_CLK_LOCKED = 13 # locked indication for meas_clk mmcm
-
- def __init__(self, label, log):
- self.log = log
- self.regs = UIO(
- label=label,
- read_only=False
- )
- self.poke32 = self.regs.poke32
- self.peek32 = self.regs.peek32
-
- def get_compat_number(self):
- """get FPGA compat number
-
- This function reads back FPGA compat number.
- The return is a tuple of
- 2 numbers: (major compat number, minor compat number )
- """
- with self.regs.open():
- compat_number = self.peek32(self.M_COMPAT_NUM)
- minor = compat_number & 0xff
- major = (compat_number>>16) & 0xff
- return (major, minor)
-
- def get_build_timestamp(self):
- """
- Returns the build date/time for the FPGA image.
- The return is datetime string with the ISO 8601 format
- (YYYY-MM-DD HH:MM:SS.mmmmmm)
- """
- with self.regs.open():
- datestamp_rb = self.peek32(self.MB_DATESTAMP)
- if datestamp_rb > 0:
- dt_str = datetime.datetime(
- year=((datestamp_rb>>17)&0x3F)+2000,
- month=(datestamp_rb>>23)&0x0F,
- day=(datestamp_rb>>27)&0x1F,
- hour=(datestamp_rb>>12)&0x1F,
- minute=(datestamp_rb>>6)&0x3F,
- second=((datestamp_rb>>0)&0x3F))
- self.log.trace("FPGA build timestamp: {}".format(str(dt_str)))
- return str(dt_str)
- else:
- # Compatibility with FPGAs without datestamp capability
- return ''
-
- def get_git_hash(self):
- """
- Returns the GIT hash for the FPGA build.
- The return is a tuple of
- 2 numbers: (short git hash, bool: is the tree dirty?)
- """
- with self.regs.open():
- git_hash_rb = self.peek32(self.MB_GIT_HASH)
- git_hash = git_hash_rb & 0x0FFFFFFF
- tree_dirty = ((git_hash_rb & 0xF0000000) > 0)
- dirtiness_qualifier = 'dirty' if tree_dirty else 'clean'
- self.log.trace("FPGA build GIT Hash: {:07x} ({})".format(
- git_hash, dirtiness_qualifier))
- return (git_hash, dirtiness_qualifier)
-
- def set_time_source(self, time_source, ref_clk_freq):
- """
- Set time source
- """
- pps_sel_val = 0x0
- if time_source == 'internal':
- assert ref_clk_freq in (10e6, 25e6)
- if ref_clk_freq == 10e6:
- self.log.trace("Setting time source to internal (10 MHz reference)...")
- pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_INT_10
- elif ref_clk_freq == 25e6:
- self.log.trace("Setting time source to internal (25 MHz reference)...")
- pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_INT_25
- elif time_source == 'external':
- self.log.trace("Setting time source to external...")
- pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_EXT
- elif time_source == 'gpsdo':
- self.log.trace("Setting time source to gpsdo...")
- pps_sel_val = 0b1 << self.MB_CLOCK_CTRL_PPS_SEL_GPSDO
- else:
- assert False
- with self.regs.open():
- reg_val = self.peek32(self.MB_CLOCK_CTRL) & 0xFFFFFFF0; # clear lowest nibble
- reg_val = reg_val | (pps_sel_val & 0xF) # set lowest nibble
- self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
- self.poke32(self.MB_CLOCK_CTRL, reg_val)
-
- def enable_pps_out(self, enable):
- """
- Enables the PPS/Trig output on the back panel
- """
- self.log.trace("%s PPS/Trig output!", "Enabling" if enable else "Disabling")
- mask = 0xFFFFFFFF ^ (0b1 << self.MB_CLOCK_CTRL_PPS_OUT_EN)
- with self.regs.open():
- reg_val = self.peek32(self.MB_CLOCK_CTRL) & mask # mask the bit to clear it
- if enable:
- reg_val = reg_val | (0b1 << self.MB_CLOCK_CTRL_PPS_OUT_EN) # set the bit if desired
- self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
- self.poke32(self.MB_CLOCK_CTRL, reg_val)
-
- def reset_meas_clk_mmcm(self, reset=True):
- """
- Reset or unreset the MMCM for the measurement clock in the FPGA TDC.
- """
- self.log.trace("%s measurement clock MMCM reset...", "Asserting" if reset else "Clearing")
- mask = 0xFFFFFFFF ^ (0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_RESET)
- with self.regs.open():
- reg_val = self.peek32(self.MB_CLOCK_CTRL) & mask # mask the bit to clear it
- if reset:
- reg_val = reg_val | (0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_RESET) # set the bit if desired
- self.log.trace("Writing MB_CLOCK_CTRL to 0x{:08X}".format(reg_val))
- self.poke32(self.MB_CLOCK_CTRL, reg_val)
-
- def get_meas_clock_mmcm_lock(self):
- """
- Check the status of the MMCM for the measurement clock in the FPGA TDC.
- """
- mask = 0b1 << self.MB_CLOCK_CTRL_MEAS_CLK_LOCKED
- with self.regs.open():
- reg_val = self.peek32(self.MB_CLOCK_CTRL)
- locked = (reg_val & mask) > 0
- if not locked:
- self.log.warning("Measurement clock MMCM reporting unlocked. MB_CLOCK_CTRL "
- "reg: 0x{:08X}".format(reg_val))
- else:
- self.log.trace("Measurement clock MMCM locked!")
- return locked
+ self._bp_leds.set(self._bp_leds.LED_LINK, 0)
diff --git a/mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py b/mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py
index 29056a51f..4911c1f75 100644
--- a/mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py
+++ b/mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py
@@ -186,3 +186,69 @@ class SysFSGPIO(object):
self.log.trace("Reading value {} from `{}'...".format(read_value, value_path))
return read_value
+class GPIOBank(object):
+ """
+ Extension of a SysFSGPIO
+ """
+ def __init__(self, uio_label, offset, usemask, ddr):
+ self._gpiosize = bin(usemask).count("1")
+ self._offset = offset
+ self._ddr = ddr
+ self._usemask = usemask
+ self._gpios = SysFSGPIO(
+ uio_label,
+ self._usemask << self._offset,
+ self._ddr << self._offset
+ )
+
+ def set_all(self, value):
+ """
+ Set all pins to 'value'.
+ This method will convert value into binary and assign all the bits in
+ the use mask.
+ """
+ bin_value = ('{0:0'+self._gpiosize+'b}').format(value)
+ wr_value = bin_value[-(self._gpiosize):]
+ for i in range(self._gpiosize):
+ if (1 << i) & self._ddr:
+ self._gpios.set(self._offset + i, wr_value[i % self._gpiosize])
+
+ def set(self, index, value=None):
+ """
+ Set a pin by index
+ """
+ assert index in range(self._gpiosize)
+ self._gpios.set(self._offset + index, value)
+
+ def reset_all(self):
+ """
+ Clear all pins
+ """
+ for i in range(self._gpiosize):
+ self._gpios.reset(self._offset+i)
+
+ def reset(self, index):
+ """
+ Clear a pin by index
+ """
+ assert index in range(self._gpiosize)
+ self._gpios.reset(self._offset + index)
+
+ def get_all(self):
+ """
+ Read back all pins
+ """
+ result = 0
+ for i in range(self._gpiosize):
+ if not (1<<i)&self._ddr:
+ value = self._gpios.get(self._offset + i)
+ result = (result << 1) | value
+ return result
+
+ def get(self, index):
+ """
+ Read back a pin by index
+ """
+ assert index in range(self._gpiosize)
+ return self._gpios.get(self._offset + index)
+