aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/periph_manager
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/periph_manager')
-rw-r--r--mpm/python/usrp_mpm/periph_manager/CMakeLists.txt1
-rw-r--r--mpm/python/usrp_mpm/periph_manager/sim.py302
2 files changed, 303 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/periph_manager/CMakeLists.txt b/mpm/python/usrp_mpm/periph_manager/CMakeLists.txt
index 4f8520c12..747b8967a 100644
--- a/mpm/python/usrp_mpm/periph_manager/CMakeLists.txt
+++ b/mpm/python/usrp_mpm/periph_manager/CMakeLists.txt
@@ -18,6 +18,7 @@ set(USRP_MPM_PERIPHMGR_FILES
${CMAKE_CURRENT_SOURCE_DIR}/e320_periphs.py
${CMAKE_CURRENT_SOURCE_DIR}/e31x.py
${CMAKE_CURRENT_SOURCE_DIR}/e31x_periphs.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/sim.py
)
list(APPEND USRP_MPM_FILES ${USRP_MPM_PERIPHMGR_FILES})
set(USRP_MPM_FILES ${USRP_MPM_FILES} PARENT_SCOPE)
diff --git a/mpm/python/usrp_mpm/periph_manager/sim.py b/mpm/python/usrp_mpm/periph_manager/sim.py
new file mode 100644
index 000000000..00ba5ba38
--- /dev/null
+++ b/mpm/python/usrp_mpm/periph_manager/sim.py
@@ -0,0 +1,302 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+usrp simulation module
+
+This module is used to emulate a usrp when running on a standard
+computer. You can build mpm in this configuration by using the cmake
+flag -DMPM_DEVICE=sim
+"""
+
+from pyroute2 import IPRoute
+from usrp_mpm.xports import XportMgrUDP
+from usrp_mpm.mpmlog import get_logger
+from usrp_mpm.rpc_server import no_claim
+from usrp_mpm.periph_manager import PeriphManagerBase
+
+CLOCK_SOURCE_INTERNAL = "internal"
+
+E320_DBOARD_SLOT_IDX = 0
+
+class SimXportMgrUDP(XportMgrUDP):
+ """This is an adaptor class for the normal XportMgrUDP
+ In radios, the interface names are hardcoded. Since we are on a
+ desktop computer, we generate the names at runtime.
+ """
+ def __init__(self, log, args, eth_dispatcher_cls):
+ with IPRoute() as ipr:
+ self.iface_config = {
+ link.get_attr('IFLA_IFNAME'): {
+ 'label': link.get_attr('IFLA_IFNAME'),
+ 'type': 'forward'
+ } for link in ipr.get_links()
+ }
+ super().__init__(log, args, eth_dispatcher_cls)
+
+class SimEthDispatcher:
+ """This is the hardware specific part of the normal XportMgrUDP
+ that we have to simulate. We get the ipv4 addr with IPRoute
+ instead of registers
+ """
+ DEFAULT_VITA_PORT = (49153, 49154)
+ LOG = None
+
+ def __init__(self, if_name):
+ self.log = get_logger(if_name)
+ self.if_name = if_name
+
+ def set_ipv4_addr(self, addr):
+ """This doesn't actually change the ipv4 address, it just
+ checks to make sure the requested address is already our
+ address, and complains otherwise.
+ """
+ with IPRoute() as ipr:
+ valid_iface_idx = ipr.link_lookup(ifname=self.if_name)[0]
+ link_info = ipr.get_links(valid_iface_idx)[0]
+ real_addr = link_info.get_attr('IFLA_ADDRESS')
+ if addr != real_addr:
+ self.log.warning("Cannot change ip address on simulator! Requested: {}, Actual: {}"
+ .format(addr, real_addr))
+
+class sim(PeriphManagerBase):
+ """This is a periph manager that is designed to run on a regular
+ computer rather than the arm core on an SDR
+ """
+ #########################################################################
+ # Overridables
+ #
+ # See PeriphManagerBase for documentation on these fields
+ #########################################################################
+ description = "E320-Series Device - SIMULATED"
+ pids = {0xE320: 'e320'}
+
+ mboard_info = {"type": "e3xx", "product": "e320"}
+ mboard_max_rev = 7 # RevC
+ mboard_sensor_callback_map = {}
+
+ ###########################################################################
+ # Ctor and device initialization tasks
+ ###########################################################################
+ def __init__(self, args):
+ super().__init__()
+ self.device_id = 1
+
+ # Unlike the real hardware drivers, if there is an exception here,
+ # we just crash. No use missing an error when testing.
+ self._init_peripherals(args)
+ self.init_dboards(args)
+ if not args.get('skip_boot_init', False):
+ self.init(args)
+
+ @classmethod
+ def generate_device_info(cls, eeprom_md, mboard_info, dboard_infos):
+ """
+ Hard-code our product map
+ """
+ # Add the default PeriphManagerBase information first
+ device_info = super().generate_device_info(
+ eeprom_md, mboard_info, dboard_infos)
+ # Then add device-specific information
+ mb_pid = eeprom_md.get('pid')
+ device_info['product'] = cls.pids.get(mb_pid, 'unknown')
+ return device_info
+
+ def _read_mboard_eeprom(self):
+ """
+ Read out a simulated mboard eeprom and saves it to the appropriate member variable
+ """
+ self._eeprom_head = sim._generate_eeprom_head()
+
+ self.log.trace("Found EEPROM metadata: '{}'"
+ .format(str(self._eeprom_head)))
+ return (self._eeprom_head, None)
+
+ @staticmethod
+ def _generate_eeprom_head(serial=b'3196D2A', rev=2, rev_compat=2):
+ return {'pid': 0xE320,
+ 'rev': rev,
+ 'rev_compat': rev_compat,
+ 'serial': serial}
+
+ def _init_peripherals(self, args):
+ """
+ Turn on all peripherals. This may throw an error on failure, so make
+ sure to catch it.
+
+ Peripherals are initialized in the order of least likely to fail, to most
+ likely.
+ """
+ # Sanity checks
+ assert self.mboard_info.get('product') in self.pids.values(), \
+ "Device product could not be determined!"
+ # Init peripherals
+
+ # Init CHDR transports
+ self._xport_mgrs = {
+ 'udp': SimXportMgrUDP(self.log, args, SimEthDispatcher)
+ }
+ #TODO: Actually create transports here when RFNoC is integrated
+ self.log.trace("CHDR transport creation was skipped")
+
+ # Init complete.
+ self.log.debug("Device info: {}".format(self.device_info))
+
+ ###########################################################################
+ # Device info
+ ###########################################################################
+ def get_device_info_dyn(self):
+ """
+ Append the device info with current IP addresses.
+ """
+ if not self._device_initialized:
+ return {}
+ device_info = self._xport_mgrs['udp'].get_xport_info()
+ self.log.warn("get_device_info_dyn() - FPGA functionality not implemented yet")
+ return device_info
+
+ def set_device_id(self, device_id):
+ """
+ Sets the device ID for this motherboard.
+ The device ID is used to identify the RFNoC components associated with
+ this motherboard.
+ """
+ self.device_id = device_id
+
+ def get_device_id(self):
+ """
+ Gets the device ID for this motherboard.
+ The device ID is used to identify the RFNoC components associated with
+ this motherboard.
+ """
+ return self.device_id
+
+ @no_claim
+ def get_proto_ver(self):
+ """
+ Return RFNoC protocol version
+ """
+ return 0x100
+
+ @no_claim
+ def get_chdr_width(self):
+ """
+ Return RFNoC CHDR width
+ """
+ return 64
+
+ ###########################################################################
+ # Transport API
+ ###########################################################################
+ def get_chdr_link_types(self):
+ """
+ This will only ever return a single item (udp).
+ """
+ return ["udp"]
+
+ def get_chdr_link_options(self, xport_type):
+ """
+ Returns a list of dictionaries. Every dictionary contains information
+ about one way to connect to this device in order to initiate CHDR
+ traffic.
+
+ The interpretation of the return value is very highly dependant on the
+ transport type (xport_type).
+ For UDP, the every entry of the list has the following keys:
+ - ipv4 (IP Address)
+ - port (UDP port)
+ - link_rate (bps of the link, e.g. 10e9 for 10GigE)
+ """
+ if xport_type not in self._xport_mgrs:
+ self.log.warning("Can't get link options for unknown link type: '{}'."
+ .format(xport_type))
+ return []
+ return self._xport_mgrs[xport_type].get_chdr_link_options()
+
+ #######################################################################
+ # Timekeeper API
+ #######################################################################
+ def get_master_clock_rate(self):
+ """ Return the master clock rate set during init """
+ return self._master_clock_rate
+
+ def get_num_timekeepers(self):
+ """
+ Return the number of timekeepers
+ """
+ return 1
+
+ def set_timekeeper_time(self, tk_idx, ticks, next_pps):
+ """
+ Set the time in ticks
+
+ Arguments:
+ tk_idx: Index of timekeeper
+ ticks: Time in ticks
+ next_pps: If True, set time at next PPS. Otherwise, set time now.
+ """
+ self.log.debug("Setting timekeeper time (tx_idx:{}, ticks: {}, next_pps: {})"
+ .format(tk_idx, ticks, next_pps))
+
+ def get_timekeeper_time(self, tk_idx, last_pps):
+ """
+ Get the time in ticks
+
+ Arguments:
+ tk_idx: Index of timekeeper
+ next_pps: If True, get time at last PPS. Otherwise, get time now.
+ """
+ return 0
+
+ def set_tick_period(self, tk_idx, period_ns):
+ """
+ Set the time per tick in nanoseconds (tick period)
+
+ Arguments:
+ tk_idx: Index of timekeeper
+ period_ns: Period in nanoseconds
+ """
+ self.log.debug("Setting tick period (tk_idx: {}, period_ns: {})"
+ .format(tk_idx, period_ns))
+
+ def get_clocks(self):
+ """
+ Gets the RFNoC-related clocks present in the FPGA design
+ """
+ return [
+ {
+ 'name': 'radio_clk',
+ 'freq': str(122.88e6),
+ 'mutable': 'true'
+ },
+ {
+ 'name': 'bus_clk',
+ 'freq': str(200e6),
+ },
+ {
+ 'name': 'ctrl_clk',
+ 'freq': str(40e6),
+ }
+ ]
+
+ def get_time_sources(self):
+ " Returns list of valid time sources "
+ return (CLOCK_SOURCE_INTERNAL,)
+
+ def get_clock_sources(self):
+ " Lists all available clock sources. "
+ return (CLOCK_SOURCE_INTERNAL,)
+
+ def get_clock_source(self):
+ " Returns the current Clock Source "
+ return CLOCK_SOURCE_INTERNAL
+
+ def set_clock_source(self, source):
+ " No-op which sets the clock source on a real radio "
+ self.log.debug("Setting clock source to {}".format(source))
+
+ def set_channel_mode(self, channel_mode):
+ " No-op which sets the channel mode on a real radio "
+ self.log.debug("Using channel mode {}".format(channel_mode))