diff options
Diffstat (limited to 'mpm/python/usrp_mpm')
| -rw-r--r-- | mpm/python/usrp_mpm/periph_manager/sim.py | 13 | ||||
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_sniffer.py | 107 | ||||
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/noc_block_regs.py | 312 | ||||
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/rfnoc_graph.py | 554 | ||||
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/stream_ep_regs.py | 129 | 
6 files changed, 1114 insertions, 5 deletions
| diff --git a/mpm/python/usrp_mpm/periph_manager/sim.py b/mpm/python/usrp_mpm/periph_manager/sim.py index 7c8baeb8f..7f9a610a7 100644 --- a/mpm/python/usrp_mpm/periph_manager/sim.py +++ b/mpm/python/usrp_mpm/periph_manager/sim.py @@ -16,6 +16,7 @@ from usrp_mpm.mpmlog import get_logger  from usrp_mpm.rpc_server import no_claim  from usrp_mpm.periph_manager import PeriphManagerBase  from usrp_mpm.simulator.sim_dboard_catalina import SimulatedCatalinaDboard +from usrp_mpm.simulator.chdr_sniffer import ChdrSniffer  CLOCK_SOURCE_INTERNAL = "internal" @@ -67,8 +68,6 @@ class sim(PeriphManagerBase):      """      #########################################################################      # Overridables -    # -    # See PeriphManagerBase for documentation on these fields      #########################################################################      description = "E320-Series Device - SIMULATED"      pids = {0xE320: 'e320'} @@ -84,6 +83,8 @@ class sim(PeriphManagerBase):          super().__init__()          self.device_id = 1 +        self.chdr_sniffer = ChdrSniffer(self.log, args) +          # Unlike the real hardware drivers, if there is an exception here,          # we just crash. No use missing an error when testing.          self._init_peripherals(args) @@ -91,8 +92,9 @@ class sim(PeriphManagerBase):          if not args.get('skip_boot_init', False):              self.init(args) -    def _simulator_frequency(self, freq): -        self.log.debug("Setting Simulator Sample Frequency to {}".format(freq)) +    def _simulator_sample_rate(self, freq): +        self.log.debug("Setting Simulator Sample Rate to {}".format(freq)) +        self.chdr_endpoint.set_sample_rate(freq)      @classmethod      def generate_device_info(cls, eeprom_md, mboard_info, dboard_infos): @@ -148,7 +150,8 @@ class sim(PeriphManagerBase):          self.log.debug("Device info: {}".format(self.device_info))      def _init_dboards(self, dboard_infos, override_dboard_pids, default_args): -        self.dboards.append(SimulatedCatalinaDboard(E320_DBOARD_SLOT_IDX, self._simulator_frequency)) +        self.dboards.append(SimulatedCatalinaDboard( +            E320_DBOARD_SLOT_IDX, self._simulator_sample_rate))          self.log.info("Found %d daughterboard(s).", len(self.dboards))      ########################################################################### diff --git a/mpm/python/usrp_mpm/simulator/CMakeLists.txt b/mpm/python/usrp_mpm/simulator/CMakeLists.txt index 6cc4ee441..82fcbd801 100644 --- a/mpm/python/usrp_mpm/simulator/CMakeLists.txt +++ b/mpm/python/usrp_mpm/simulator/CMakeLists.txt @@ -13,6 +13,10 @@ set(USRP_MPM_SIMULATOR_FILES      ${CMAKE_CURRENT_SOURCE_DIR}/__init__.py      ${CMAKE_CURRENT_SOURCE_DIR}/sim_dboard.py      ${CMAKE_CURRENT_SOURCE_DIR}/sim_dboard_catalina.py +    ${CMAKE_CURRENT_SOURCE_DIR}/chdr_sniffer.py +    ${CMAKE_CURRENT_SOURCE_DIR}/noc_block_regs.py +    ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_graph.py +    ${CMAKE_CURRENT_SOURCE_DIR}/stream_ep_regs.py  )  list(APPEND USRP_MPM_FILES ${USRP_MPM_SIMULATOR_FILES})  set(USRP_MPM_FILES ${USRP_MPM_FILES} PARENT_SCOPE) diff --git a/mpm/python/usrp_mpm/simulator/chdr_sniffer.py b/mpm/python/usrp_mpm/simulator/chdr_sniffer.py new file mode 100644 index 000000000..8ff467cf2 --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/chdr_sniffer.py @@ -0,0 +1,107 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module houses the ChdrSniffer class, which handles networking, +packet dispatch, and the nitty gritty of spinning up rx and tx workers. + +Note: Rx and Tx are reversed when compared to their definitions in the +UHD radio_control_impl.cpp file. Rx is when samples are coming to the +simulator. Tx is when samples are being sent by the simulator. + +TODO: This class is run based on threads for development simplicity. If +more throughput is desired, it should be rewritten to use processes +instead. This allows the socket workers to truly work in parallel. +Python threads are limited by holding the GIL while they are executing. +""" + +from threading import Thread +import socket +from uhd.chdr import ChdrPacket, ChdrWidth, PacketType +from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType + +CHDR_W = ChdrWidth.W64 + +class ChdrSniffer: +    """This class is created by the sim periph_manager +    It is responsible for opening sockets, dispatching all chdr packet +    traffic to the appropriate destination, and responding to said +    traffic. + +    The extra_args parameter is passed in from the periph_manager, and +    coresponds to the --default_args flag of usrp_hwd.py on the +    command line +    """ +    def __init__(self, log, extra_args): +        self.log = log.getChild("ChdrSniffer") +        self.thread = Thread(target=self.socket_worker, daemon=True) +        self.thread.start() +        self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx, +                                self.end_tx, self.send_strc, self.begin_rx) +        self.xport_map = {} + +    def set_sample_rate(self, rate): +        """Set the sample_rate of the next tx_stream. + +        This method is called by the daughterboard. It coresponds to +        sim_dboard.py:sim_db#set_catalina_clock_rate() +        """ +        self.graph.get_stream_spec().sample_rate = rate + +    def get_default_nodes(self): +        """Get a sensible NoC Core setup. This is the simplest +        functional layout. It has one of each required component. +        """ +        nodes = [ +            XportNode(0), +            XbarNode(0, [2], [0]), +            StreamEndpointNode(0) +        ] +        return nodes + +    def send_strc(self, stream_ep, addr): +        pass # TODO: currently not implemented + +    def begin_tx(self, src_epid, stream_spec): +        pass # TODO: currently not implemented + +    def end_tx(self, src_epid): +        pass # TODO: currently not implemented + +    def begin_rx(self, dst_epid): +        pass # TODO: currently not implemented + +    def socket_worker(self): +        """This is the method that runs in a background thread. It +        blocks on the CHDR socket and processes packets as they come +        in. +        """ +        self.log.info("Starting ChdrSniffer Thread") +        main_sock = socket.socket(socket.AF_INET, +                                  socket.SOCK_DGRAM) +        main_sock.bind(("0.0.0.0", 49153)) + +        while True: +            # This allows us to block on multiple sockets at the same time +            buffer = bytearray(8192) # Max MTU +            # received Data over socket +            n_bytes, sender = main_sock.recvfrom_into(buffer) +            self.log.trace("received {} bytes of data from {}" +                           .format(n_bytes, sender)) +            try: +                packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes]) +                self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload())) +                entry_xport = (1, NodeType.XPORT, 0) +                pkt_type = packet.get_header().pkt_type +                response = self.graph.handle_packet(packet, entry_xport, sender) + +                if response is not None: +                    data = response.serialize() +                    self.log.trace("Returning Packet: {}" +                                   .format(packet.to_string_with_payload())) +                    main_sock.sendto(bytes(data), sender) +            except BaseException as ex: +                self.log.warning("Unable to decode packet: {}" +                                 .format(ex)) +                raise ex diff --git a/mpm/python/usrp_mpm/simulator/noc_block_regs.py b/mpm/python/usrp_mpm/simulator/noc_block_regs.py new file mode 100644 index 000000000..b488ed43f --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/noc_block_regs.py @@ -0,0 +1,312 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +# Read Register Addresses +#! Register address of the protocol version +PROTOVER_ADDR = 0 * 4 +#! Register address of the port information +PORT_CNT_ADDR = 1 * 4 +#! Register address of the edge information +EDGE_CNT_ADDR = 2 * 4 +#! Register address of the device information +DEVICE_INFO_ADDR = 3 * 4 +#! Register address of the controlport information +CTRLPORT_CNT_ADDR = 4 * 4 +#! (Write) Register address of the flush and reset controls +FLUSH_RESET_ADDR = 1 * 4 + +#! Base address of the adjacency list +ADJACENCY_BASE_ADDR = 0x10000 +#! Each port is allocated this many registers in the backend register space +REGS_PER_PORT = 16 + +REG_RX_MAX_WORDS_PER_PKT = 0x28 +REG_RX_CMD_NUM_WORDS_HI = 0x1C +REG_RX_CMD_NUM_WORDS_LO = 0x18 +REG_RX_CMD = 0x14 + +RX_CMD_CONTINUOUS = 0x2 +RX_CMD_STOP = 0x0 +RX_CMD_FINITE = 0x1 + +RADIO_BASE_ADDR = 0x1000 +REG_CHAN_OFFSET = 128 # 0x80 + + +class StreamEndpointPort: +    """Represents a port on a Stream Endpoint + +    inst should be the same as the stream endpoint's node_inst +    """ +    def __init__(self, inst, port): +        self.inst = inst +        self.port = port +    def to_tuple(self, num_stream_ep): +        # The entry in an adjacency list is (blk_id, block_port) +        # where blk_id for stream endpoints starts at 1 +        # and Noc Blocks are addressed after the last stream endpoint +        # See rfnoc_graph.cpp +        return (1 + self.inst, self.port) + +class NocBlockPort: +    """Represents a port on a Noc Block""" +    def __init__(self, inst, port): +        self.inst = inst +        self.port = port + +    def to_tuple(self, num_stream_ep): +        # The entry in an adjacency list is (blk_id, block_port) +        # where blk_id for stream endpoints starts at 1 +        # and Noc Blocks are addressed after the last stream endpoint +        # See rfnoc_graph.cpp +        return (1 + num_stream_ep + self.inst, self.port) + +class NocBlock: +    """Represents a NocBlock + +    see client_zero.hpp:block_config_info + +    NOTE: The mtu in bytes is calculated by (2**data_mtu * CHDR_W) +    """ +    def __init__(self, protover, num_inputs, num_outputs, ctrl_fifo_size, +                 ctrl_max_async_msgs, noc_id, data_mtu): +        self.protover = protover +        self.num_inputs = num_inputs +        self.num_outputs = num_outputs +        self.ctrl_fifo_size = ctrl_fifo_size +        self.ctrl_max_async_msgs = ctrl_max_async_msgs +        self.noc_id = noc_id +        self.data_mtu = data_mtu + +    def read_reg(self, reg_num): +        # See client_zero.cpp +        if reg_num == 0: +            return self.read_config() +        elif reg_num == 1: +            return self.read_noc_id() +        elif reg_num == 2: +            return self.read_data() +        else: +            raise RuntimeError("NocBlock doesn't have a register #{}".format(reg_num)) + +    def read_config(self): +        return (self.protover & 0x3F) | \ +            ((self.num_inputs & 0x3F) << 6) | \ +            ((self.num_outputs & 0x3F) << 12) | \ +            ((self.ctrl_fifo_size & 0x3F) << 18) | \ +            ((self.ctrl_max_async_msgs & 0xFF) << 24) + +    def read_data(self): +        return (self.data_mtu & 0x3F) << 2 | \ +            (1 << 1) # Permanently Set flush done + +    def read_noc_id(self): +        return self.noc_id & 0xFFFFFFFF + +class NocBlockRegs: +    """Represents registers associated whith a group of NoCBlocks +    roughly similar to UHD's client_zero + +    NOTE: Many write operations are currently unimplemented and simply no-op +    """ +    def __init__(self, log, protover, has_xbar, num_xports, blocks, num_stream_ep, num_ctrl_ep, +                 device_type, adjacency_list, sample_width, samples_per_cycle, get_stream_spec, +                 create_tx_stream, stop_tx_stream): +        """ Args: +        protover -> FPGA Compat number +        has_xbar -> Is there a chdr xbar? +        num_xports -> how many xports +        blocks -> list of NocBlock objects +        num_stream_ep -> how many stream endpoints +        num_ctrl_ep -> how many ctrl endpoints +        device_type -> the device type (see defaults.hpp:device_type_t in UHD) +        adjacency_list -> List of (Port, Port) tuples where +            Port is either StreamEndpointPort or NocBlockPort +        sample_width -> Sample width of radio +        samples_per_cycle -> Samples produced by a radio cycle +        get_stream_spec -> Callback which returns the current stream spec +        create_tx_stream -> Callback which takes a block_index and starts a tx stream +        stop_tx_stream -> Callback which takes a block_index and stops a tx stream +        """ +        self.log = log.getChild("Regs") +        self.protover = protover +        self.has_xbar = has_xbar +        self.num_xports = num_xports +        self.blocks = blocks +        self.num_blocks = len(blocks) +        self.num_stream_ep = num_stream_ep +        self.num_ctrl_ep = num_ctrl_ep +        self.device_type = device_type +        self.adjacency_list = [(src_blk.to_tuple(num_stream_ep), dst_blk.to_tuple(num_stream_ep)) +                               for src_blk, dst_blk in adjacency_list] +        self.adjacency_list_reg = NocBlockRegs._parse_adjacency_list(self.adjacency_list) +        self.sample_width = sample_width +        self.samples_per_cycle = samples_per_cycle +        self.radio_reg = {} +        self.get_stream_spec = get_stream_spec +        self.create_tx_stream = create_tx_stream +        self.stop_tx_stream = stop_tx_stream + +    def read(self, addr): +        # See client_zero.cpp +        if addr == PROTOVER_ADDR: +            return self.read_protover() +        elif addr == PORT_CNT_ADDR: +            return self.read_port_cnt() +        elif addr == EDGE_CNT_ADDR: +            return self.read_edge_cnt() +        elif addr == DEVICE_INFO_ADDR: +            return self.read_device_info() +        elif addr == CTRLPORT_CNT_ADDR: +            return self.read_ctrlport_cnt() +        elif addr >= 0x40 and addr < 0x1000: +            return self.read_port_reg(addr) +        # See radio_control_impl.cpp +        elif addr >= 0x1000 and addr < 0x10000: +            return self.read_radio(addr) +        # See client_zero.cpp +        elif addr >= 0x10000: +            return self.read_adjacency_list(addr) +        else: +            raise RuntimeError("Unsupported register addr: 0x{:08X}".format(addr)) + +    def read_radio(self, addr): +        if addr == 0x1000: +            raise NotImplementedError() # TODO: This should be REG_COMPAT +        elif addr == 0x1004: +            return self.read_radio_width() +        else: +            offset = addr - 0x1000 +            chan = offset // 0x80 +            radio_offset = offset % 0x80 +            if radio_offset == 0x40: +                return self.radio_reg +            elif radio_offset == 0x3C: +                return self.radio_reg +            else: +                raise NotImplementedError("Radio addr 0x{:08X} not implemented".format(addr)) + +    def write_radio(self, addr, value): +        """Write a value to radio registers + +        See radio_control_impl.cpp +        """ +        offset = addr - 0x1000 +        assert offset >= 0 +        chan = offset // 0x80 +        if chan > 0: # For now, just operate as if there is one channel +            self.log.warn("Channel {} not suported".format(chan)) +            return +        reg = offset % 0x80 +        if reg == REG_RX_MAX_WORDS_PER_PKT: +            self.get_stream_spec().packet_samples = value +        elif reg == REG_RX_CMD_NUM_WORDS_HI: +            self.get_stream_spec().set_num_words_hi(value) +        elif reg == REG_RX_CMD_NUM_WORDS_LO: +            self.get_stream_spec().set_num_words_lo(value) +        elif reg == REG_RX_CMD: +            if value & (1 << 31) != 0: +                self.log.warn("Timed Streams are not supported. Starting immediately") +                value = value & ~(1 << 31) # Clear the flag +            if value == RX_CMD_STOP: +                sep_block_id = self.resolve_ep_towards_outputs((self.get_radio_port(), chan)) +                self.stop_tx_stream(sep_block_id) +                return +            elif value == RX_CMD_CONTINUOUS: +                self.get_stream_spec().is_continuous = True +            elif value == RX_CMD_FINITE: +                self.get_stream_spec().is_continuous = False +            else: +                raise RuntimeError("Unknown Stream RX_CMD: {:08X}".format(value)) +            sep_block_id = self.resolve_ep_towards_outputs((self.get_radio_port(), chan)) +            self.create_tx_stream(sep_block_id) + +    def resolve_ep_towards_outputs(self, block_id): +        """Follow dataflow downstream through the adjacency list until +        a stream_endpoint is encountered +        """ +        for src_blk, dst_blk in self.adjacency_list: +            if src_blk == block_id: +                dst_index, dst_port = dst_blk +                if dst_index <= self.num_stream_ep: +                    return dst_blk +                else: +                    return self.resolve_ep_towards_outputs(dst_blk) + +    def get_radio_port(self): +        """Returns the block_id of the radio block""" +        radio_noc_id = 0x12AD1000 +        for i, block in enumerate(self.blocks): +            if block.noc_id == radio_noc_id: +                return i + 1 + self.num_stream_ep + +    # This is the FPGA compat number +    def read_protover(self): +        return 0xFFFF & self.protover + +    def read_port_cnt(self): +        return (self.num_stream_ep & 0x3FF) | \ +            ((self.num_blocks & 0x3FF) << 10) | \ +            ((self.num_xports & 0x3FF) << 20) | \ +            ((1 if self.has_xbar else 0) << 31) + +    def read_edge_cnt(self): +        return len(self.adjacency_list) + +    def read_device_info(self): +        return (self.device_type & 0xFFFF) << 16 + +    def read_ctrlport_cnt(self): +        return (self.num_ctrl_ep & 0x3FF) + +    def read_adjacency_list(self, addr): +        offset = addr & 0xFFFF +        if offset == 0: +            self.log.debug("Adjacency List has {} entries".format(len(self.adjacency_list_reg))) +            return len(self.adjacency_list_reg) +        else: +            assert(offset % 4 == 0) +            index = (offset // 4) - 1 +            return self.adjacency_list_reg[index] + +    def write(self, addr, value): +        if addr == 0x1040 or addr == 0x10C0: +            self.log.trace("Storing value: 0x:{:08X} to self.radio_reg for data loopback test".format(value)) +            self.radio_reg = value +        # assuming 2 channels, out of bounds is +        # BASE + 2 * CHAN_OFFSET = 0x1000 + 2 * 0x80 = 0x1100 +        elif 0x1000 <= addr < 0x1100: +            self.write_radio(addr, value) + +    def read_port_reg(self, addr): +        port = addr // 0x40 +        if port < self.num_stream_ep: +            raise NotImplementedError() +        else: +            block = port - self.num_stream_ep - 1 +            offset = (addr % 0x40) // 4 +            return self.blocks[block].read_reg(offset) + +    def read_radio_width(self): +        return (self.samples_per_cycle & 0xFFFF) | \ +            ((self.sample_width & 0xFFFF) << 16) + +    @staticmethod +    def _parse_adjacency_list(adj_list): +        """Serialize an adjacency list from the form of +        [((src_blk, src_port), (dst_blk, dst_port))] + +        See client_zero.cpp:client_zero#_get_adjacency_list() +        """ +        def pack(blocks): +            src_blk, src_port = blocks[0] +            dst_blk, dst_port = blocks[1] +            return ((src_blk & 0x3FF) << 22) | \ +                ((src_port & 0x3F) << 16) | \ +                ((dst_blk & 0x3FF) << 6) | \ +                ((dst_port & 0x3F) << 0) +        return [pack(blocks) for blocks in adj_list] + diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py new file mode 100644 index 000000000..42210ab6e --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py @@ -0,0 +1,554 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module includes all of the components necessary to simulate the +configuration of a NoC Core and NoC Blocks. + +This module handles and responds to Management and Ctrl Packets. It +also instantiates the registers and acts as an interface between +the chdr packets on the network and the registers. +""" +from enum import IntEnum +from uhd.chdr import PacketType, MgmtOp, MgmtOpCode, MgmtOpNodeInfo, \ +    CtrlStatus, CtrlOpCode, MgmtOpCfg, MgmtOpSelDest +from .noc_block_regs import NocBlockRegs, NocBlock, StreamEndpointPort, NocBlockPort +from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED + +class StreamSpec: +    """This class carries the configuration parameters of a Tx stream. + +    total_samples, is_continuous, and packet_samples come from the +    radio registers (noc_block_regs.py) + +    sample_rate comes from an rpc to the daughterboard (through the +    set_sample_rate method in chdr_sniffer.py) + +    dst_epid comes from the source stream_ep + +    addr comes from the xport passed through when routing to dst_epid +    """ +    LOW_MASK = 0xFFFFFFFF +    HIGH_MASK = (0xFFFFFFFF) << 32 +    def __init__(self): +        self.total_samples = 0 +        self.is_continuous = True +        self.packet_samples = None +        self.sample_rate = None +        self.dst_epid = None +        self.addr = None + +    def set_num_words_lo(self, low): +        """Set the low 32 bits of the total_samples field""" +        self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \ +            | (low & StreamSpec.LOW_MASK) + +    def set_num_words_hi(self, high): +        """Set the high 32 bits of the total_samples field""" +        self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \ +            | ((high & StreamSpec.LOW_MASK) << 32) + +    def seconds_per_packet(self): +        """Calculates how many seconds should be between each packet +        transmit +        """ +        assert self.packet_samples != 0 +        assert self.sample_rate != 0 +        return self.packet_samples / self.sample_rate + +    def __str__(self): +        return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \ +            " sample_rate: {}, dst_epid: {}, addr: {}}}" \ +            .format(self.total_samples, self.is_continuous, self.packet_samples, +                    self.sample_rate, self.dst_epid, self.addr) + +def to_iter(index_func, length): +    """Allows looping over an indexed object in a for-each loop""" +    for i in range(length): +        yield index_func(i) + +def _swap_src_dst(packet, payload): +    """Swap the src_epid and the dst_epid of a packet""" +    header = packet.get_header() +    our_epid = header.dst_epid +    header.dst_epid = payload.src_epid +    payload.src_epid = our_epid +    packet.set_header(header) + +class NodeType(IntEnum): +    """The type of a node in a NoC Core + +    RTS is a magic value used to determine when to return a packet to +    the sender +    """ +    INVALID = 0 +    XBAR = 1 +    STRM_EP = 2 +    XPORT = 3 +    RTS = 0xFF + +RETURN_TO_SENDER = (0, NodeType.RTS, 0) + +class Node: +    """Represents a node in a RFNoCGraph + +    These objects are usually constructed by the caller of RFNoC Graph. +    Initially, they have references to other nodes as indexes of the +    list of nodes. The graph calls graph_init and from_index so this +    node can initialize their references to node_ids instead. + +    Subclasses can override _handle_<packet_type>_packet methods, and +    _handle_default_packet is provided, which will receive packets whose +    specific methods are not overridden +    """ +    def __init__(self, node_inst): +        """node_inst should start at 0 and increase for every Node of +        the same type that is constructed +        """ +        self.device_id = None +        self.node_inst = node_inst +        self.log = None + +    def graph_init(self, log, device_id, **kwargs): +        """This method is called to initialize the Node Graph""" +        self.device_id = device_id +        self.log = log + +    def get_id(self): +        """Return the NodeID (device_id, node_type, node_inst)""" +        return (self.device_id, self.get_type(), self.node_inst) + +    def get_type(self): +        """Returns the NodeType of this node""" +        raise NotImplementedError + +    def handle_packet(self, packet, **kwargs): +        """Processes a packet + +        The return value should be a node_id or RETURN_TO_SENDER +        """ +        packet_type = packet.get_header().pkt_type +        next_node = None +        if packet_type == PacketType.MGMT: +            next_node = self._handle_mgmt_packet(packet, **kwargs) +        elif packet_type == PacketType.CTRL: +            next_node = self._handle_ctrl_packet(packet, **kwargs) +        elif packet_type in (PacketType.DATA_W_TS, PacketType.DATA_NO_TS): +            next_node = self._handle_data_packet(packet, **kwargs) +        elif packet_type == PacketType.STRS: +            next_node = self._handle_strs_packet(packet, **kwargs) +        elif packet_type == PacketType.STRC: +            next_node = self._handle_strc_packet(packet, **kwargs) +        else: +            raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type)) +        if next_node == NotImplemented: +            next_node = self._handle_default_packet(packet, **kwargs) +        return next_node + +    def _handle_mgmt_packet(self, packet, **kwargs): +        return NotImplemented + +    def _handle_ctrl_packet(self, packet, **kwargs): +        return NotImplemented + +    def _handle_data_packet(self, packet, **kwargs): +        return NotImplemented + +    def _handle_strc_packet(self, packet, **kwargs): +        return NotImplemented + +    def _handle_strs_packet(self, packet, **kwargs): +        return NotImplemented + +    def _handle_default_packet(self, packet, **kwargs): +        raise RuntimeError("{} has no operation defined for a {} packet" +                           .format(self.__class__, packet.get_header().pkt_type)) + +    def from_index(self, nodes): +        """Initialize this node's indexes to block_id references""" +        pass + +    def info_response(self, extended_info): +        """Generate a node info response MgmtOp""" +        return MgmtOp( +            op_payload=MgmtOpNodeInfo(self.device_id, self.get_type(), +                                      self.node_inst, extended_info), +            op_code=MgmtOpCode.INFO_RESP +        ) + +class XportNode(Node): +    """Represents an Xport node + +    When an Advertise Management op is received, the address and +    src_epid of the packet is placed in self.addr_map +    """ +    def __init__(self, node_inst): +        super().__init__(node_inst) +        self.downstream = None +        self.addr_map = {} + +    def get_type(self): +        return NodeType.XPORT + +    def _handle_mgmt_packet(self, packet, addr, **kwargs): +        send_upstream = False +        payload = packet.get_payload_mgmt() +        our_hop = payload.pop_hop() +        packet.set_payload(payload) +        for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): +            if op.op_code == MgmtOpCode.INFO_REQ: +                payload.get_hop(0).add_op(self.info_response(0)) +            elif op.op_code == MgmtOpCode.RETURN: +                send_upstream = True +                _swap_src_dst(packet, payload) +            elif op.op_code == MgmtOpCode.NOP: +                pass +            elif op.op_code == MgmtOpCode.ADVERTISE: +                self.log.info("Advertise: {} | EPID:{} -> EPID:{}" +                              .format(self.get_id(), payload.src_epid, +                                      packet.get_header().dst_epid)) +                self.log.info("addr_map updated: EPID:{} -> {}".format(payload.src_epid, addr)) +                self.addr_map[payload.src_epid] = addr +            else: +                raise NotImplementedError(op.op_code) +        self.log.trace("Xport {} processed hop:\n{}" +                       .format(self.node_inst, our_hop)) +        packet.set_payload(payload) +        if send_upstream: +            return RETURN_TO_SENDER +        else: +            return self.downstream + +    def _handle_default_packet(self, packet, **kwargs): +        return self.downstream + +class XbarNode(Node): +    """Represents a crossbar node + +    self.routing_table stores a mapping of dst_epids to xbar ports + +    port numbers start from 0. xports are addressed first +    stream ep ports are then addressed where +    the index of the first ep = len(xport_ports) + +    NOTE: UHD inteprets the node_inst of a xbar as the port which +    it is connected to. This differs from its handling of node_inst +    for other types of nodes. +    """ +    def __init__(self, node_inst, ports, xport_ports): +        super().__init__(node_inst) +        self.nports = len(ports) + len(xport_ports) +        self.nports_xport = len(xport_ports) +        self.ports = xport_ports + ports +        self.routing_table = {} + +    def get_type(self): +        return NodeType.XBAR + +    def _handle_mgmt_packet(self, packet, **kwargs): +        send_upstream = False +        destination = None +        payload = packet.get_payload_mgmt() +        our_hop = payload.pop_hop() +        for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): +            if op.op_code == MgmtOpCode.INFO_REQ: +                payload.get_hop(0).add_op(self.info_response( +                    (self.nports_xport << 8) | self.nports)) +            elif op.op_code == MgmtOpCode.RETURN: +                send_upstream = True +                _swap_src_dst(packet, payload) +            elif op.op_code == MgmtOpCode.NOP: +                pass +            elif op.op_code == MgmtOpCode.ADVERTISE: +                self.log.info("Advertise: {}".format(self.get_id())) +            elif op.op_code == MgmtOpCode.CFG_WR_REQ: +                cfg = op.get_op_payload() +                cfg = MgmtOpCfg.parse(cfg) +                self.routing_table[cfg.addr] = cfg.data +                self.log.debug("Xbar {} routing changed: {}" +                               .format(self.node_inst, self.routing_table)) +            elif op.op_code == MgmtOpCode.SEL_DEST: +                cfg = op.get_op_payload() +                cfg = MgmtOpSelDest.parse(cfg) +                dest_port = cfg.dest +                destination = self.ports[dest_port] +            else: +                raise NotImplementedError(op.op_code) +        self.log.trace("Xbar {} processed hop:\n{}" +                       .format(self.node_inst, our_hop)) +        packet.set_payload(payload) +        if send_upstream: +            return RETURN_TO_SENDER +        elif destination is not None: +            return destination + +    def _handle_default_packet(self, packet, **kwargs): +        dst_epid = packet.get_header().dst_epid +        if dst_epid not in self.routing_table: +            raise RuntimeError("Xbar no destination for packet (dst_epid: {})".format(dst_epid)) +        return self.ports[self.routing_table[dst_epid]] + +    def from_index(self, nodes): +        """This iterates through the list of nodes and sets the +        reference in self.ports to the node's id +        """ +        for i in range(len(self.ports)): +            nodes_index = self.ports[i] +            node = nodes[nodes_index] +            self.ports[i] = node.get_id() +            if node.__class__ is XportNode: +                node.downstream = self.get_id() +            elif node.__class__ is StreamEndpointNode: +                node.upstream = self.get_id() + +class StreamEndpointNode(Node): +    """Represents a Stream endpoint node + +    This class contains a StreamEpRegs object. To clarify, management +    packets access these registers, while control packets access the +    registers of the noc_blocks which are held in the RFNoCGraph and +    passed into handle_packet as the regs parameter +    """ +    def __init__(self, node_inst): +        super().__init__(node_inst) +        self.epid = node_inst +        self.dst_epid = None +        self.upstream = None +        self.sep_config = None +        # These 4 aren't configurable right now +        self.has_data = True +        self.has_ctrl = True +        self.input_ports = 2 +        self.output_ports = 2 +        self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid, +                                    self.status_callback_out, self.status_callback_in, 4, 4 * 8000) + +    def status_callback_out(self, status): +        """Called by the ep_regs when on a write to the +        REG_OSTRM_CTRL_STATUS register +        """ +        if status.cfg_start: +            # This only creates a new TxWorker if the cfg_start flag is set +            self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) +            self.sep_config(self, True) +        return STRM_STATUS_FC_ENABLED + +    def status_callback_in(self, status): +        """Called by the ep_regs on a write to the +        REG_OSTRM_CTRL_STATUS register +        """ +        # This always triggers the graph to create a new RxWorker +        self.sep_config(self, False) +        return STRM_STATUS_FC_ENABLED + +    def graph_init(self, log, device_id, sep_config, **kwargs): +        super().graph_init(log, device_id) +        self.ep_regs.log = log +        self.sep_config = sep_config + +    def get_type(self): +        return NodeType.STRM_EP + +    def get_epid(self): +        return self.epid + +    def set_epid(self, epid): +        self.epid = epid + +    def set_dst_epid(self, dst_epid): +        self.dst_epid = dst_epid + +    def _handle_mgmt_packet(self, packet, regs, **kwargs): +        send_upstream = False +        payload = packet.get_payload_mgmt() +        our_hop = payload.pop_hop() +        for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): +            if op.op_code == MgmtOpCode.INFO_REQ: +                ext_info = 0 +                ext_info |= (1 if self.has_ctrl else 0) << 0 +                ext_info |= (1 if self.has_data else 0) << 1 +                ext_info |= (self.input_ports & 0x3F) << 2 +                ext_info |= (self.output_ports & 0x3F) << 8 +                payload.get_hop(0).add_op(self.info_response(ext_info)) +            elif op.op_code == MgmtOpCode.RETURN: +                send_upstream = True +                _swap_src_dst(packet, payload) +            elif op.op_code == MgmtOpCode.NOP: +                pass +            elif op.op_code == MgmtOpCode.CFG_RD_REQ: +                request = MgmtOpCfg.parse(op.get_op_payload()) +                value = self.ep_regs.read(request.addr) +                payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP, +                                                 MgmtOpCfg(request.addr, value))) +            elif op.op_code == MgmtOpCode.CFG_WR_REQ: +                request = MgmtOpCfg.parse(op.get_op_payload()) +                self.ep_regs.write(request.addr, request.data) +            else: +                raise NotImplementedError("op_code {} is not implemented for " +                                          "StreamEndpointNode".format(op.op_code)) +        self.log.trace("Stream Endpoint {} processed hop:\n{}" +                       .format(self.node_inst, our_hop)) +        packet.set_payload(payload) +        if send_upstream: +            return RETURN_TO_SENDER + +    def _handle_ctrl_packet(self, packet, regs, **kwargs): +        payload = packet.get_payload_ctrl() +        _swap_src_dst(packet, payload) +        if payload.status != CtrlStatus.OKAY: +            raise RuntimeError("Control Status not OK: {}".format(payload.status)) +        if payload.op_code == CtrlOpCode.READ: +            payload.is_ack = True +            payload.set_data([regs.read(payload.address)]) +        elif payload.op_code == CtrlOpCode.WRITE: +            payload.is_ack = True +            regs.write(payload.address, payload.get_data()[0]) +        else: +            raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code)) +        packet.set_payload(payload) +        return RETURN_TO_SENDER + +class RFNoCGraph: +    """This class holds all of the nodes of the NoC core and the Noc +    blocks. + +    It serves as an interface between the ChdrSniffer and the +    individual blocks/nodes. +    """ +    def __init__(self, graph_list, log, device_id, create_tx_func, +                 stop_tx_func, send_strc_func, create_rx_func): +        self.log = log.getChild("Graph") +        self.send_strc = send_strc_func +        self.device_id = device_id +        self.stream_spec = StreamSpec() +        self.create_tx = create_tx_func +        self.stop_tx = stop_tx_func +        self.create_rx = create_rx_func +        num_stream_ep = 0 +        for node in graph_list: +            if node.__class__ is StreamEndpointNode: +                num_stream_ep += 1 +            node.graph_init(self.log, device_id, sep_config=self.prepare_stream_ep) +        # These must be done sequentially so that device_id is initialized on all nodes +        # before from_index is called on any node +        for node in graph_list: +            node.from_index(graph_list) +        self.graph_map = {node.get_id(): node +                          for node in graph_list} +        # For now, just use one radio block and hardcode it to the first stream endpoint +        radio = NocBlock(1 << 16, 2, 2, 512, 1, 0x12AD1000, 16) +        adj_list = [ +            (StreamEndpointPort(0, 0), NocBlockPort(0, 0)), +            (StreamEndpointPort(0, 1), NocBlockPort(0, 1)), +            (NocBlockPort(0, 0), StreamEndpointPort(0, 0)), +            (NocBlockPort(0, 1), StreamEndpointPort(0, 1)) +        ] +        self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], num_stream_ep, 1, 0xE320, +                                 adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd, +                                 self.radio_tx_stop) + +    def radio_tx_cmd(self, sep_block_id): +        """Triggers the creation of a TxWorker in the ChdrSniffer using +        the current stream_spec. + +        This method transforms the sep_block_id into an epid useable by +        the transmit code +        """ +        # TODO: Use the port +        sep_blk, sep_port = sep_block_id +        # The NoC Block index for stream endpoints is the inst + 1 +        # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map() +        sep_inst = sep_blk - 1 +        sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst) +        stream_ep = self.graph_map[sep_id] +        self.stream_spec.dst_epid = stream_ep.dst_epid +        self.stream_spec.addr = self.dst_to_addr(stream_ep) +        self.log.info("Streaming with StreamSpec:") +        self.log.info(str(self.stream_spec)) +        self.create_tx(stream_ep.epid, self.stream_spec) +        self.stream_spec = StreamSpec() + +    def radio_tx_stop(self, sep_block_id): +        """Triggers the destuction of a TxWorker in the ChdrSniffer + +        This method transforms the sep_block_id into an epid useable by +        the transmit code +        """ +        sep_blk, sep_port = sep_block_id +        sep_inst = sep_blk - 1 +        # The NoC Block index for stream endpoints is the inst + 1 +        # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map() +        sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst) +        src_epid = self.graph_map[sep_id].epid +        self.stop_tx(src_epid) + +    def get_device_id(self): +        return self.device_id + +    def prepare_stream_ep(self, stream_ep, is_tx): +        """This is called by a stream_ep when it receives a status update + +        Depending on whether it was a tx or rx status, it will either +        trigger an strc packet or an rx stream +        """ +        if is_tx: +            addr = self.dst_to_addr(stream_ep) +            self.send_strc(stream_ep, addr) +        else: +            self.create_rx(stream_ep.epid) + +    def change_spp(self, spp): +        self.stream_spec.packet_samples = spp + +    def find_ep_by_id(self, epid): +        """Find a Stream Endpoint which identifies with epid""" +        for node in self.graph_map.values(): +            if node.__class__ is StreamEndpointNode: +                if node.epid == epid: +                    return node + +    # Fixme: This doesn't support intra-device connections +    # i.e. connecting nodes by connecting two internal stream endpoints +    # +    # That would require looking at the adjacency list if we end up at +    # another stream endpoint instead of an xport +    def dst_to_addr(self, src_ep): +        """This function traverses backwards through the node graph, +        starting from src_ep and taking the path traveled by a packet +        heading for src_ep.dst_epid. When it encounters an xport, it +        returns the address associated with the dst_epid in the xport's +        addr_map +        """ +        current_node = src_ep +        dst_epid = src_ep.dst_epid +        while current_node.__class__ != XportNode: +            if current_node.__class__ == StreamEndpointNode: +                current_node = self.graph_map[current_node.upstream] +            else: +                # current_node is a xbar +                port = current_node.routing_table[dst_epid] +                upstream_id = current_node.ports[port] +                current_node = self.graph_map[upstream_id] +        return current_node.addr_map[dst_epid] + +    def handle_packet(self, packet, xport_input, addr): +        """Given a chdr_packet, the id of an xport node to serve as an +        entry point, and a source address, send the packet through the +        node graph. +        """ +        node_id = xport_input +        response_packet = None +        while node_id is not None: +            if node_id[1] == NodeType.RTS: +                response_packet = packet +                break +            node = self.graph_map[node_id] +            # If the node returns a value, it is the node id of the +            # node the packet should be passed to next +            # or RETURN_TO_SENDER +            node_id = node.handle_packet(packet, regs=self.regs, addr=addr) +        return response_packet + +    def get_stream_spec(self): +        return self.stream_spec diff --git a/mpm/python/usrp_mpm/simulator/stream_ep_regs.py b/mpm/python/usrp_mpm/simulator/stream_ep_regs.py new file mode 100644 index 000000000..defed345f --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/stream_ep_regs.py @@ -0,0 +1,129 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module holds the Emulated registers for a Stream Endpoint Node. +One of these objects is instantiated for each Stream Endpoint Node. +""" +from enum import IntEnum + +REG_EPID_SELF = 0x00 # RW +REG_RESET_AND_FLUSH = 0x04 # W +REG_OSTRM_CTRL_STATUS = 0x08 # RW +REG_OSTRM_DST_EPID = 0x0C # W +REG_OSTRM_FC_FREQ_BYTES_LO = 0x10 # W +REG_OSTRM_FC_FREQ_BYTES_HI = 0x14 # W +REG_OSTRM_FC_FREQ_PKTS = 0x18 # W +REG_OSTRM_FC_HEADROOM = 0x1C # W +REG_OSTRM_BUFF_CAP_BYTES_LO = 0x20 # R +REG_OSTRM_BUFF_CAP_BYTES_HI = 0x24 # R +REG_OSTRM_BUFF_CAP_PKTS = 0x28 # R +REG_OSTRM_SEQ_ERR_CNT = 0x2C # R +REG_OSTRM_DATA_ERR_CNT = 0x30 # R +REG_OSTRM_ROUTE_ERR_CNT = 0x34 # R +REG_ISTRM_CTRL_STATUS = 0x38 # RW + +RESET_AND_FLUSH_OSTRM = (1 << 0) +RESET_AND_FLUSH_ISTRM = (1 << 1) +RESET_AND_FLUSH_CTRL = (1 << 2) +RESET_AND_FLUSH_ALL = 0x7 + +STRM_STATUS_FC_ENABLED = 0x80000000 +STRM_STATUS_SETUP_ERR = 0x40000000 +STRM_STATUS_SETUP_PENDING = 0x20000000 + +class SwBuff(IntEnum): +    """The size of the elements in a buffer""" +    BUFF_U64 = 0 +    BUFF_U32 = 1 +    BUFF_U16 = 2 +    BUFF_U8 = 3 + +class CtrlStatusWord: +    """Represents a Control Status Word + +    See mgmt_portal:BUILD_CTRL_STATUS_WORD() +    """ +    def __init__(self, cfg_start, xport_lossy, pyld_buff_fmt, mdata_buff_fmt, byte_swap): +        self.cfg_start = cfg_start +        self.xport_lossy = xport_lossy +        self.pyld_buff_fmt = pyld_buff_fmt +        self.mdata_buff_fmt = mdata_buff_fmt +        self.byte_swap = byte_swap + +    @classmethod +    def parse(cls, val): +        cfg_start = (val & 1) != 0 +        xport_lossy = ((val >> 1) & 1) != 0 +        pyld_buff_fmt = SwBuff((val >> 2) & 3) +        mdata_buff_fmt = SwBuff((val >> 4) & 3) +        byte_swap = ((val >> 6) & 1) != 0 +        return cls(cfg_start, xport_lossy, pyld_buff_fmt, mdata_buff_fmt, byte_swap) + +    def __str__(self): +        return "CtrlStatusWord{{cfg_start: {}, xport_lossy: {}, " \ +               "pyld_buff_fmt: {}, mdata_buff_fmt: {}, byte_swap: {}}}" \ +               .format(self.cfg_start, self.xport_lossy, +                       self.pyld_buff_fmt, self.mdata_buff_fmt, self.byte_swap) + +class StreamEpRegs: +    """Represents a set of registers associated with a stream endpoint +    which can be accessed through management packets + +    See mgmt_portal.cpp +    """ +    def __init__(self, get_epid, set_epid, set_dst_epid, update_status_out, +                 update_status_in, cap_pkts, cap_bytes): +        self.get_epid = get_epid +        self.set_epid = set_epid +        self.set_dst_epid = set_dst_epid +        self.log = None +        self.out_ctrl_status = 0 +        self.in_ctrl_status = 0 +        self.update_status_out = update_status_out +        self.update_status_in = update_status_in +        self.cap_pkts = cap_pkts +        self.cap_bytes = cap_bytes + +    def read(self, addr): +        if addr == REG_EPID_SELF: +            return self.get_epid() +        elif addr == REG_OSTRM_CTRL_STATUS: +            return self.out_ctrl_status +        elif addr == REG_ISTRM_CTRL_STATUS: +            return self.in_ctrl_status +        elif addr == REG_OSTRM_BUFF_CAP_BYTES_LO: +            return self.cap_bytes & 0xFFFFFFFF +        elif addr == REG_OSTRM_BUFF_CAP_BYTES_HI: +            return (self.cap_bytes >> 32) & 0xFFFFFFFF +        elif addr == REG_OSTRM_BUFF_CAP_PKTS: +            return self.cap_pkts +        else: +            raise NotImplementedError("Unable to read addr 0x{:08X} from stream ep regs" +                                      .format(addr)) + +    def write(self, addr, val): +        if addr == REG_EPID_SELF: +            self.log.debug("Setting EPID to {}".format(val)) +            self.set_epid(val) +        elif addr == REG_OSTRM_CTRL_STATUS: +            status = CtrlStatusWord.parse(val) +            self.log.debug("Setting EPID Output Stream Ctrl Status: {}".format(status)) +            new_status = self.update_status_out(status) +            self.out_ctrl_status = new_status if new_status is not None else val +        elif addr == REG_RESET_AND_FLUSH: +            self.log.trace("Stream EP Regs Reset and Flush") +        elif addr == REG_OSTRM_DST_EPID: +            self.log.debug("Setting Dest EPID to {}".format(val)) +            self.set_dst_epid(val) +        elif REG_OSTRM_FC_FREQ_BYTES_LO <= addr <= REG_OSTRM_FC_HEADROOM: +            pass # TODO: implement these Flow Control parameters +        elif addr == REG_ISTRM_CTRL_STATUS: +            status = CtrlStatusWord.parse(val) +            self.log.debug("Setting EPID Input Stream Ctrl Status: {}".format(status)) +            new_status = self.update_status_in(status) +            self.in_ctrl_status = new_status if new_status is not None else val +        else: +            raise NotImplementedError("Unable to write addr 0x{:08X} from stream ep regs" +                                      .format(addr)) | 
