aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/sys_utils
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2017-12-12 09:59:50 -0800
committerMartin Braun <martin.braun@ettus.com>2017-12-22 15:05:58 -0800
commitd3e6dd11406893bfbc5537dfbe74d8151bbc1280 (patch)
tree8663263b3c5a4ff7202e01a5f9c5c6bb23969829 /mpm/python/usrp_mpm/sys_utils
parentea7cc7f8250cd9bdb41c1f22703d38be91fb6a84 (diff)
downloaduhd-d3e6dd11406893bfbc5537dfbe74d8151bbc1280.tar.gz
uhd-d3e6dd11406893bfbc5537dfbe74d8151bbc1280.tar.bz2
uhd-d3e6dd11406893bfbc5537dfbe74d8151bbc1280.zip
mpm: Harmonize imports, tidy + sort modules
- Moved nijesdcore to cores/ - Moved udev, net, dtoverlay, uio to sys_utils/ - Made all imports non-relative (except in __init__.py files) - Removed some unnecessary imports - Reordered some imports for Python conventions
Diffstat (limited to 'mpm/python/usrp_mpm/sys_utils')
-rw-r--r--mpm/python/usrp_mpm/sys_utils/CMakeLists.txt18
-rw-r--r--mpm/python/usrp_mpm/sys_utils/__init__.py11
-rw-r--r--mpm/python/usrp_mpm/sys_utils/dtoverlay.py129
-rw-r--r--mpm/python/usrp_mpm/sys_utils/net.py130
-rw-r--r--mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py199
-rw-r--r--mpm/python/usrp_mpm/sys_utils/udev.py51
-rw-r--r--mpm/python/usrp_mpm/sys_utils/uio.py167
7 files changed, 705 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/sys_utils/CMakeLists.txt b/mpm/python/usrp_mpm/sys_utils/CMakeLists.txt
new file mode 100644
index 000000000..1b5a4ed0e
--- /dev/null
+++ b/mpm/python/usrp_mpm/sys_utils/CMakeLists.txt
@@ -0,0 +1,18 @@
+#
+# Copyright 2017 Ettus Research, National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0
+#
+
+SET(USRP_MPM_FILES ${USRP_MPM_FILES})
+SET(USRP_MPM_SYSUTILS_FILES
+ ${CMAKE_CURRENT_SOURCE_DIR}/__init__.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/dtoverlay.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/net.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/sysfs_gpio.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/udev.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/uio.py
+)
+LIST(APPEND USRP_MPM_FILES ${USRP_MPM_SYSUTILS_FILES})
+SET(USRP_MPM_FILES ${USRP_MPM_FILES} PARENT_SCOPE)
+
diff --git a/mpm/python/usrp_mpm/sys_utils/__init__.py b/mpm/python/usrp_mpm/sys_utils/__init__.py
new file mode 100644
index 000000000..61be8ac53
--- /dev/null
+++ b/mpm/python/usrp_mpm/sys_utils/__init__.py
@@ -0,0 +1,11 @@
+#
+# Copyright 2017 Ettus Research, National Instruments Company
+#
+# SPDX-License-Identifier: GPL-3.0
+#
+"""
+System/OS Utilities
+
+These are convenience functions to access Linux subsystems.
+"""
+
diff --git a/mpm/python/usrp_mpm/sys_utils/dtoverlay.py b/mpm/python/usrp_mpm/sys_utils/dtoverlay.py
new file mode 100644
index 000000000..577bafed9
--- /dev/null
+++ b/mpm/python/usrp_mpm/sys_utils/dtoverlay.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2017 Ettus Research (National Instruments)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+"""
+Manipulation of device tree overlays (Linux kernel)
+"""
+
+import os
+from usrp_mpm.mpmlog import get_logger
+
+SYSFS_OVERLAY_BASE_DIR = '/sys/kernel/config/device-tree/overlays'
+OVERLAY_DEFAULT_PATH = '/lib/firmware'
+
+def get_overlay_attrs(overlay_name):
+ """
+ List those attributes that are connected to an overlay entry in a sysfs
+ node.
+ """
+ overlay_path = os.path.join(SYSFS_OVERLAY_BASE_DIR, overlay_name)
+ attrs = {}
+ for attr_name in os.listdir(overlay_path):
+ try:
+ attr_val = open(
+ os.path.join(overlay_path, attr_name), 'r'
+ ).read().strip()
+ except OSError:
+ pass
+ if len(attr_val):
+ attrs[attr_name] = attr_val
+ return attrs
+
+def is_applied(overlay_name):
+ """
+ Returns True if the overlay is already applied, False if not.
+ """
+ try:
+ return open(
+ os.path.join(SYSFS_OVERLAY_BASE_DIR, overlay_name, 'status')
+ ).read().strip() == 'applied'
+ except IOError:
+ return False
+
+def list_overlays(applied_only=False):
+ """
+ List all registered kernel modules. Returns a dict of dicts:
+ {
+ '<overlay_name>': {
+ # attributes
+ },
+ }
+
+ All the attributes come from sysfs. See get_overlay_attrs().
+
+ applied_only -- Only return those overlays that are already applied.
+ """
+ return {
+ overlay_name: get_overlay_attrs(overlay_name)
+ for overlay_name in os.listdir(SYSFS_OVERLAY_BASE_DIR)
+ if not applied_only \
+ or get_overlay_attrs(overlay_name).get('status') == 'applied'
+ }
+
+def list_available_overlays(path):
+ """
+ List available overlay files (dtbo)
+ """
+ path = path or OVERLAY_DEFAULT_PATH
+ return [x.strip()[:-5] for x in os.listdir(path) if x.endswith('.dtbo')]
+
+def apply_overlay(overlay_name):
+ """
+ Applies the given overlay. Does not check if the overlay is loaded.
+ """
+ get_logger("DTO").trace("Applying overlay `{}'...".format(overlay_name))
+ overlay_path = os.path.join(SYSFS_OVERLAY_BASE_DIR, overlay_name)
+ if not os.path.exists(overlay_path):
+ os.mkdir(overlay_path)
+ open(
+ os.path.join(SYSFS_OVERLAY_BASE_DIR, overlay_name, 'path'), 'w'
+ ).write("{}.dtbo".format(overlay_name))
+
+def apply_overlay_safe(overlay_name):
+ """
+ Only apply an overlay if it's not yet applied.
+
+ Finally, checks that the overlay was applied and throws an exception if not.
+ """
+ if is_applied(overlay_name):
+ get_logger("DTO").debug(
+ "Overlay `{}' was already applied, not applying again.".format(
+ overlay_name
+ )
+ )
+ else:
+ apply_overlay(overlay_name)
+ if not is_applied(overlay_name):
+ raise RuntimeError("Failed to apply overlay `{}'".format(overlay_name))
+
+def rm_overlay(overlay_name):
+ """
+ Removes the given overlay. Does not check if the overlay is loaded.
+ """
+ get_logger("DTO").trace("Removing overlay `{}'...".format(overlay_name))
+ os.rmdir(os.path.join(SYSFS_OVERLAY_BASE_DIR, overlay_name))
+
+def rm_overlay_safe(overlay_name):
+ """
+ Only remove an overlay if it's already applied.
+ """
+ if overlay_name in list(list_overlays(applied_only=True).keys()):
+ rm_overlay(overlay_name)
+ else:
+ get_logger("DTO").debug(
+ "Overlay `{}' was not loaded, not removing.".format(overlay_name)
+ )
+
diff --git a/mpm/python/usrp_mpm/sys_utils/net.py b/mpm/python/usrp_mpm/sys_utils/net.py
new file mode 100644
index 000000000..e3deea3e7
--- /dev/null
+++ b/mpm/python/usrp_mpm/sys_utils/net.py
@@ -0,0 +1,130 @@
+
+# Copyright 2017 Ettus Research (National Instruments)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+"""
+N310 implementation module
+"""
+import itertools
+import socket
+from six import iteritems
+from pyroute2 import IPRoute
+from usrp_mpm.mpmlog import get_logger
+
+
+def get_valid_interfaces(iface_list):
+ """
+ Given a list of interfaces (['eth1', 'eth2'] for example), return the
+ subset that contains actually valid entries.
+ Interfaces are checked for if they actually exist, and if so, if they're up.
+ """
+ valid_ifaces = []
+ with IPRoute() as ipr:
+ for iface in iface_list:
+ valid_iface_idx = ipr.link_lookup(ifname=iface)
+ if len(valid_iface_idx) == 0:
+ continue
+ valid_iface_idx = valid_iface_idx[0]
+ link_info = ipr.get_links(valid_iface_idx)[0]
+ if link_info.get_attr('IFLA_OPERSTATE') == 'UP' \
+ and len(get_iface_addrs(link_info.get_attr('IFLA_ADDRESS'))):
+ assert link_info.get_attr('IFLA_IFNAME') == iface
+ valid_ifaces.append(iface)
+ return valid_ifaces
+
+
+def get_iface_info(ifname):
+ """
+ Given an interface name (e.g. 'eth1'), return a dictionary with the
+ following keys:
+ - ip_addr: Main IPv4 address
+ - ip_addrs: List of valid IPv4 addresses
+ - mac_addr: MAC address
+
+ All values are stored as strings.
+ """
+ try:
+ with IPRoute() as ipr:
+ links = ipr.link_lookup(ifname=ifname)
+ if len(links) == 0:
+ raise LookupError("No interfaces known with name `{}'!"
+ .format(ifname))
+ link_info = ipr.get_links(links)[0]
+ except IndexError:
+ raise LookupError("Could not get links for interface `{}'"
+ .format(ifname))
+ mac_addr = link_info.get_attr('IFLA_ADDRESS')
+ ip_addrs = get_iface_addrs(mac_addr)
+ return {
+ 'mac_addr': mac_addr,
+ 'ip_addr': ip_addrs[0],
+ 'ip_addrs': ip_addrs,
+ }
+
+def ip_addr_to_iface(ip_addr, iface_list):
+ """
+ Return an Ethernet interface (e.g. 'eth1') given an IP address.
+
+ Arguments:
+ ip_addr -- The IP address as a string
+ iface_list -- A map "interface name" -> iface_info, where iface_info
+ is another map as returned by net.get_iface_info().
+ """
+ # Flip around the iface_info map and then use it to look up by IP addr
+ return {
+ iface_info['ip_addr']: iface_name
+ for iface_name, iface_info in iteritems(iface_list)
+ }[ip_addr]
+
+
+def get_iface_addrs(mac_addr):
+ """
+ return ipv4 addresses for a given macaddress
+ input format: "aa:bb:cc:dd:ee:ff"
+ """
+ with IPRoute() as ip2:
+ # returns index
+ [link] = ip2.link_lookup(address=mac_addr)
+ # Only get v4 addresses
+ addresses = [addr.get_attrs('IFA_ADDRESS')
+ for addr in ip2.get_addr(family=socket.AF_INET)
+ if addr.get('index', None) == link]
+ # flatten possibly nested list
+ addresses = list(itertools.chain.from_iterable(addresses))
+
+ return addresses
+
+
+def byte_to_mac(byte_str):
+ """
+ converts a bytestring into nice hex representation
+ """
+ return ':'.join(["%02x" % ord(x) for x in byte_str])
+
+
+def get_mac_addr(remote_addr):
+ """
+ return MAC address of a remote host already discovered
+ or None if no host entry was found
+ """
+ with IPRoute() as ip2:
+ addrs = ip2.get_neighbours(dst=remote_addr)
+ if len(addrs) > 1:
+ get_logger('get_mac_addr').warning("More than one device with the "
+ "same IP address found. "
+ "Picking entry at random")
+ if not addrs:
+ return None
+ return addrs[0].get_attr('NDA_LLADDR')
diff --git a/mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py b/mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py
new file mode 100644
index 000000000..f8384a3b8
--- /dev/null
+++ b/mpm/python/usrp_mpm/sys_utils/sysfs_gpio.py
@@ -0,0 +1,199 @@
+#
+# Copyright 2017 Ettus Research (National Instruments)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+"""
+Access to GPIOs mapped into the PS via sysfs
+"""
+
+import os
+from builtins import object
+import pyudev
+from usrp_mpm.mpmlog import get_logger
+
+GPIO_SYSFS_BASE_DIR = '/sys/class/gpio'
+GPIO_SYSFS_LABELFILE = 'label'
+GPIO_SYSFS_VALUEFILE = 'value'
+
+def get_all_gpio_devs(parent_dev=None):
+ """
+ Returns a list of all GPIO chips available through sysfs. Will look
+ something like ['gpiochip882', 'gpiochip123', ...]
+
+ If there are multiple devices with the same label (example: daughterboards
+ may have a single label for a shared component), a parent device needs to
+ be provided to disambiguate.
+
+ Arguments:
+ parent_dev -- A parent udev device. If this is provided, only GPIO devices
+ which are a child of the parent device are returned.
+
+ Example:
+ >>> parent_dev = pyudev.Devices.from_sys_path(
+ pyudev.Context(), '/sys/class/i2c-adapter/i2c-10')
+ >>> get_all_gpio_devs(parent_dev)
+ """
+ try:
+ context = pyudev.Context()
+ gpios = [device.sys_name
+ for device in context.list_devices(
+ subsystem="gpio").match_parent(parent_dev)
+ if device.device_number == 0
+ ]
+ return gpios
+ except OSError:
+ # Typically means GPIO not available, maybe no overlay
+ return []
+
+def get_gpio_map_info(gpio_dev):
+ """
+ Returns all the map info for a given GPIO device.
+ Example: If pio_dev is 'gpio882', it will list all files
+ in /sys/class/gpio/gpio882/ and create a dictionary with filenames
+ as keys and content as value. Subdirs are skipped.
+
+ Numbers are casted to numbers automatically. Strings remain strings.
+ """
+ map_info = {}
+ map_info_path = os.path.join(
+ GPIO_SYSFS_BASE_DIR, gpio_dev,
+ )
+ for info_file in os.listdir(map_info_path):
+ if not os.path.isfile(os.path.join(map_info_path, info_file)):
+ continue
+ map_info_value = open(os.path.join(map_info_path, info_file), 'r').read().strip()
+ try:
+ map_info[info_file] = int(map_info_value, 0)
+ except ValueError:
+ map_info[info_file] = map_info_value
+ # Manually add GPIO number
+ context = pyudev.Context()
+ map_info['sys_number'] = int(
+ pyudev.Devices.from_name(context, subsystem="gpio", sys_name=gpio_dev).sys_number
+ )
+ return map_info
+
+def find_gpio_device(label, parent_dev=None, logger=None):
+ """
+ Given a label, returns a tuple (uio_device, map_info).
+ uio_device is something like 'gpio882'. map_info is a dictionary with
+ information regarding the GPIO device read from the map info sysfs dir.
+ """
+ gpio_devices = get_all_gpio_devs(parent_dev)
+ if logger:
+ logger.trace("Found the following UIO devices: `{0}'".format(','.join(gpio_devices)))
+ for gpio_device in gpio_devices:
+ map_info = get_gpio_map_info(gpio_device)
+ if logger:
+ logger.trace("{0} has map info: {1}".format(gpio_device, map_info))
+ if map_info.get('label') == label:
+ if logger:
+ logger.trace("Device matches label: `{0}'".format(gpio_device))
+ return gpio_device, map_info
+ if logger:
+ logger.warning("Found no matching gpio device for label `{0}'".format(label))
+ return None, None
+
+class SysFSGPIO(object):
+ """
+ API for accessing GPIOs mapped into userland via sysfs
+ """
+
+ def __init__(self, label, use_mask, ddr, init_value=0, parent_dev=None):
+ assert (use_mask & ddr) == ddr
+ self.log = get_logger("SysFSGPIO")
+ self._label = label
+ self._use_mask = use_mask
+ self._ddr = ddr
+ self._init_value = init_value
+ self.log.trace("Generating SysFSGPIO object for label `{}'...".format(label))
+ self._gpio_dev, self._map_info = \
+ find_gpio_device(label, parent_dev, self.log)
+ if self._gpio_dev is None:
+ self.log.error("Could not find GPIO device with label `{}'.".format(label))
+ self.log.trace("GPIO base number is {}".format(self._map_info.get("sys_number")))
+ self._base_gpio = self._map_info.get("sys_number")
+ self.init(self._map_info['ngpio'],
+ self._base_gpio,
+ self._use_mask,
+ self._ddr,
+ self._init_value)
+
+ def init(self, n_gpio, base, use_mask, ddr, init_value=0):
+ """
+ Guarantees that all the devices are created accordingly
+
+ E.g., if use_mask & 0x1 is True, it makes sure that 'gpioXXX' is exported.
+ Also sets the DDRs.
+ """
+ gpio_list = [x for x in range(n_gpio) if (1<<x) & use_mask]
+ self.log.trace("Initializing {} GPIOs...".format(len(gpio_list)))
+ for gpio_idx in gpio_list:
+ gpio_num = base + gpio_idx
+ ddr_out = ddr & (1<<gpio_idx)
+ ini_v = init_value & (1<<gpio_idx)
+ gpio_path = os.path.join(GPIO_SYSFS_BASE_DIR, 'gpio{}'.format(gpio_num))
+ if not os.path.exists(gpio_path):
+ self.log.trace("Creating GPIO path `{}'...".format(gpio_path))
+ open(os.path.join(GPIO_SYSFS_BASE_DIR, 'export'), 'w').write('{}'.format(gpio_num))
+ ddr_str = 'out' if ddr_out else 'in'
+ ddr_str = 'high' if ini_v else ddr_str
+ self.log.trace("On GPIO path `{}', setting DDR mode to {}.".format(gpio_path, ddr_str))
+ open(os.path.join(GPIO_SYSFS_BASE_DIR, gpio_path, 'direction'), 'w').write(ddr_str)
+
+ def set(self, gpio_idx, value=None):
+ """
+ Assert a GPIO at given index.
+
+ Note: The GPIO must be in the valid range, and it's DDR value must be
+ high (for "out").
+ """
+ if value is None:
+ value = 1
+ assert (1<<gpio_idx) & self._use_mask
+ assert (1<<gpio_idx) & self._ddr
+ gpio_num = self._base_gpio + gpio_idx
+ gpio_path = os.path.join(GPIO_SYSFS_BASE_DIR, 'gpio{}'.format(gpio_num))
+ value_path = os.path.join(gpio_path, GPIO_SYSFS_VALUEFILE)
+ self.log.trace("Writing value `{}' to `{}'...".format(value, value_path))
+ assert os.path.exists(value_path)
+ open(value_path, 'w').write('{}'.format(value))
+
+ def reset(self, gpio_idx):
+ """
+ Deassert a GPIO at given index.
+
+ Note: The GPIO must be in the valid range, and it's DDR value must be
+ high (for "out").
+ """
+ self.set(gpio_idx, value=0)
+
+ def get(self, gpio_idx):
+ """
+ Read back a GPIO at given index.
+
+ Note: The GPIO must be in the valid range, and it's DDR value must be
+ low (for "in").
+ """
+ assert (1<<gpio_idx) & self._use_mask
+ assert (1<<gpio_idx) & (~self._ddr)
+ gpio_num = self._base_gpio + gpio_idx
+ gpio_path = os.path.join(GPIO_SYSFS_BASE_DIR, 'gpio{}'.format(gpio_num))
+ value_path = os.path.join(gpio_path, GPIO_SYSFS_VALUEFILE)
+ assert os.path.exists(value_path)
+ read_value = int(open(value_path, 'r').read().strip())
+ self.log.trace("Reading value {} from `{}'...".format(read_value, value_path))
+ return read_value
+
diff --git a/mpm/python/usrp_mpm/sys_utils/udev.py b/mpm/python/usrp_mpm/sys_utils/udev.py
new file mode 100644
index 000000000..87d264d2d
--- /dev/null
+++ b/mpm/python/usrp_mpm/sys_utils/udev.py
@@ -0,0 +1,51 @@
+#
+# Copyright 2017 Ettus Research (National Instruments)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import pyudev
+
+def get_eeprom_paths(address):
+ """
+ Return list of EEPROM device paths for a given I2C address.
+ If no device paths are found, an empty list is returned.
+ """
+ context = pyudev.Context()
+ parent = pyudev.Device.from_name(context, "platform", address)
+ paths = [d.device_node if d.device_node is not None else d.sys_path
+ for d in context.list_devices(parent=parent, subsystem="nvmem")]
+ if len(paths) == 0:
+ return []
+ # We need to sort this so 9-0050 comes before 10-0050 (etc.)
+ maxlen = max((len(os.path.split(p)[1]) for p in paths))
+ paths = sorted(
+ paths,
+ key=lambda x: "{:>0{maxlen}}".format(os.path.split(x)[1], maxlen=maxlen)
+ )
+ return [os.path.join(x, 'nvmem') for x in paths]
+
+def get_spidev_nodes(spi_master):
+ """
+ Return list of spidev device paths for a given SPI master. If no valid paths
+ can be found, an empty list is returned.
+ """
+ context = pyudev.Context()
+ parent = pyudev.Device.from_name(context, "platform", spi_master)
+ return [
+ device.device_node
+ for device in context.list_devices(parent=parent, subsystem="spidev")
+ ]
+
diff --git a/mpm/python/usrp_mpm/sys_utils/uio.py b/mpm/python/usrp_mpm/sys_utils/uio.py
new file mode 100644
index 000000000..c07227776
--- /dev/null
+++ b/mpm/python/usrp_mpm/sys_utils/uio.py
@@ -0,0 +1,167 @@
+#
+# Copyright 2017 Ettus Research (National Instruments)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+"""
+Access to UIO mapped memory.
+"""
+
+import struct
+import os
+import mmap
+from builtins import hex
+from builtins import object
+import pyudev
+from usrp_mpm.mpmlog import get_logger
+
+UIO_SYSFS_BASE_DIR = '/sys/class/uio'
+UIO_DEV_BASE_DIR = '/dev'
+
+def get_all_uio_devs():
+ """
+ Return a list of all uio devices. Will look something like
+ ['uio0', 'uio1', ...].
+ """
+ try:
+ context = pyudev.Context()
+ paths = [os.path.split(device.device_node)[-1]
+ for device in context.list_devices(subsystem="uio")]
+ return paths
+ except OSError:
+ # Typically means UIO devices
+ return []
+
+def get_uio_map_info(uio_dev, map_num):
+ """
+ Returns all the map info for a given UIO device and map number.
+ Example: If uio_dev is 'uio0', and map_num is 0, it will list all files
+ in /sys/class/uio/uio0/maps/map0/ and create a dictionary with filenames
+ as keys and content as value.
+
+ Numbers are casted to numbers automatically. Strings remain strings.
+ """
+ map_info = {}
+ map_info_path = os.path.join(
+ UIO_SYSFS_BASE_DIR, uio_dev, 'maps', 'map{0}'.format(map_num)
+ )
+ for info_file in os.listdir(map_info_path):
+ map_info_value = open(os.path.join(map_info_path, info_file), 'r').read().strip()
+ try:
+ map_info[info_file] = int(map_info_value, 0)
+ except ValueError:
+ map_info[info_file] = map_info_value
+ return map_info
+
+def find_uio_device(label, logger=None):
+ """
+ Given a label, returns a tuple (uio_device, map_info).
+ uio_device is something like '/dev/uio0'. map_info is a dictionary with
+ information regarding the UIO device read from the map info sysfs dir.
+ Note: We assume a single map (map0) for all UIO devices here.
+ """
+ uio_devices = get_all_uio_devs()
+ if logger:
+ logger.trace("Found the following UIO devices: `{0}'".format(','.join(uio_devices)))
+ for uio_device in uio_devices:
+ map0_info = get_uio_map_info(uio_device, 0)
+ logger.trace("{0} has map info: {1}".format(uio_device, map0_info))
+ if map0_info.get('name') == label:
+ if logger:
+ logger.trace("Device matches label: `{0}'".format(uio_device))
+ return os.path.join(UIO_DEV_BASE_DIR, uio_device), map0_info
+ if logger:
+ logger.warning("Found no matching UIO device for label `{0}'".format(label))
+ return None, None
+
+class UIO(object):
+ """
+ Provides peek/poke interfaces for uio-mapped memory.
+
+ Arguments:
+ label -- Label of the UIO device. The label is set in the device tree
+ overlay
+ path -- Path to UIO device, e.g. '/dev/uio0'. This is ignored if 'label' is
+ provided.
+ length -- Number of bytes in the address space (is passed to mmap.mmap).
+ This is usually automatically determined. No need to set it.
+ Unless you really know what you're doing.
+ read_only -- Boolean; True == ro, False == rw
+ offset -- Passed to mmap.mmap.
+ This is usually automatically determined. No need to set it.
+ Unless you really know what you're doing.
+ """
+ def __init__(self, label=None, path=None, length=None, read_only=True, offset=None):
+ self.log = get_logger('UIO')
+ if label is None:
+ self._path = path
+ self.log.trace("Using UIO device `{0}'".format(path))
+ uio_device = os.path.split(path)[-1]
+ self.log.trace("Getting map info for UIO device `{0}'".format(uio_device))
+ map_info = get_uio_map_info(uio_device, 0)
+ # Python can't tell the size of a uio device by itself
+ assert length is not None
+ else:
+ self.log.trace("Using UIO device by label `{0}'".format(label))
+ self._path, map_info = find_uio_device(label, self.log)
+ offset = offset or map_info['offset'] # If we ever support multiple maps, check if this is correct...
+ assert offset == 0 # ...and then remove this line
+ length = length or map_info['size']
+ self.log.trace("UIO device is being opened read-{0}.".format("only" if read_only else "write"))
+ if self._path is None:
+ self.log.error("Could not find a UIO device for label {0}".format(label))
+ raise RuntimeError("Could not find a UIO device for label {0}".format(label))
+ self._read_only = read_only
+ self.log.trace("Opening UIO device file {}...".format(self._path))
+ self._fd = os.open(self._path, os.O_RDONLY if read_only else os.O_RDWR)
+ self.log.trace("Calling mmap({fd}, length={length}, offset={offset})".format(
+ fd=self._fd, length=hex(length), offset=hex(offset)
+ ))
+ self._mm = mmap.mmap(
+ self._fd,
+ length,
+ flags=mmap.MAP_SHARED,
+ prot=mmap.PROT_READ | (0 if read_only else mmap.PROT_WRITE),
+ offset=offset,
+ )
+
+ def __del__(self):
+ """
+ Destructor needs to close the uio-mapped memory
+ """
+ try:
+ self._mm.close()
+ os.close(self._fd)
+ except:
+ self.log.warning("Failed to properly destruct UIO object.")
+ pass
+
+ def peek32(self, addr):
+ """
+ Returns the 32-bit value starting at address addr as an integer
+ """
+ return struct.unpack('@I', self._mm[addr:addr+4])[0]
+
+ def poke32(self, addr, val):
+ """
+ Writes the 32-bit value val to address starting at addr.
+ Will throw if read_only was set to True.
+ A value that exceeds 32 bits will be truncated to 32 bits.
+ """
+ assert not self._read_only
+ self._mm[addr:addr+4] = struct.pack(
+ '@I',
+ (val & 0xFFFFFFFF),
+ )
+