diff options
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/rfnoc_graph.py')
-rw-r--r-- | mpm/python/usrp_mpm/simulator/rfnoc_graph.py | 554 |
1 files changed, 554 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py new file mode 100644 index 000000000..42210ab6e --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py @@ -0,0 +1,554 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module includes all of the components necessary to simulate the +configuration of a NoC Core and NoC Blocks. + +This module handles and responds to Management and Ctrl Packets. It +also instantiates the registers and acts as an interface between +the chdr packets on the network and the registers. +""" +from enum import IntEnum +from uhd.chdr import PacketType, MgmtOp, MgmtOpCode, MgmtOpNodeInfo, \ + CtrlStatus, CtrlOpCode, MgmtOpCfg, MgmtOpSelDest +from .noc_block_regs import NocBlockRegs, NocBlock, StreamEndpointPort, NocBlockPort +from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED + +class StreamSpec: + """This class carries the configuration parameters of a Tx stream. + + total_samples, is_continuous, and packet_samples come from the + radio registers (noc_block_regs.py) + + sample_rate comes from an rpc to the daughterboard (through the + set_sample_rate method in chdr_sniffer.py) + + dst_epid comes from the source stream_ep + + addr comes from the xport passed through when routing to dst_epid + """ + LOW_MASK = 0xFFFFFFFF + HIGH_MASK = (0xFFFFFFFF) << 32 + def __init__(self): + self.total_samples = 0 + self.is_continuous = True + self.packet_samples = None + self.sample_rate = None + self.dst_epid = None + self.addr = None + + def set_num_words_lo(self, low): + """Set the low 32 bits of the total_samples field""" + self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \ + | (low & StreamSpec.LOW_MASK) + + def set_num_words_hi(self, high): + """Set the high 32 bits of the total_samples field""" + self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \ + | ((high & StreamSpec.LOW_MASK) << 32) + + def seconds_per_packet(self): + """Calculates how many seconds should be between each packet + transmit + """ + assert self.packet_samples != 0 + assert self.sample_rate != 0 + return self.packet_samples / self.sample_rate + + def __str__(self): + return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \ + " sample_rate: {}, dst_epid: {}, addr: {}}}" \ + .format(self.total_samples, self.is_continuous, self.packet_samples, + self.sample_rate, self.dst_epid, self.addr) + +def to_iter(index_func, length): + """Allows looping over an indexed object in a for-each loop""" + for i in range(length): + yield index_func(i) + +def _swap_src_dst(packet, payload): + """Swap the src_epid and the dst_epid of a packet""" + header = packet.get_header() + our_epid = header.dst_epid + header.dst_epid = payload.src_epid + payload.src_epid = our_epid + packet.set_header(header) + +class NodeType(IntEnum): + """The type of a node in a NoC Core + + RTS is a magic value used to determine when to return a packet to + the sender + """ + INVALID = 0 + XBAR = 1 + STRM_EP = 2 + XPORT = 3 + RTS = 0xFF + +RETURN_TO_SENDER = (0, NodeType.RTS, 0) + +class Node: + """Represents a node in a RFNoCGraph + + These objects are usually constructed by the caller of RFNoC Graph. + Initially, they have references to other nodes as indexes of the + list of nodes. The graph calls graph_init and from_index so this + node can initialize their references to node_ids instead. + + Subclasses can override _handle_<packet_type>_packet methods, and + _handle_default_packet is provided, which will receive packets whose + specific methods are not overridden + """ + def __init__(self, node_inst): + """node_inst should start at 0 and increase for every Node of + the same type that is constructed + """ + self.device_id = None + self.node_inst = node_inst + self.log = None + + def graph_init(self, log, device_id, **kwargs): + """This method is called to initialize the Node Graph""" + self.device_id = device_id + self.log = log + + def get_id(self): + """Return the NodeID (device_id, node_type, node_inst)""" + return (self.device_id, self.get_type(), self.node_inst) + + def get_type(self): + """Returns the NodeType of this node""" + raise NotImplementedError + + def handle_packet(self, packet, **kwargs): + """Processes a packet + + The return value should be a node_id or RETURN_TO_SENDER + """ + packet_type = packet.get_header().pkt_type + next_node = None + if packet_type == PacketType.MGMT: + next_node = self._handle_mgmt_packet(packet, **kwargs) + elif packet_type == PacketType.CTRL: + next_node = self._handle_ctrl_packet(packet, **kwargs) + elif packet_type in (PacketType.DATA_W_TS, PacketType.DATA_NO_TS): + next_node = self._handle_data_packet(packet, **kwargs) + elif packet_type == PacketType.STRS: + next_node = self._handle_strs_packet(packet, **kwargs) + elif packet_type == PacketType.STRC: + next_node = self._handle_strc_packet(packet, **kwargs) + else: + raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type)) + if next_node == NotImplemented: + next_node = self._handle_default_packet(packet, **kwargs) + return next_node + + def _handle_mgmt_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_ctrl_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_data_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_strc_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_strs_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_default_packet(self, packet, **kwargs): + raise RuntimeError("{} has no operation defined for a {} packet" + .format(self.__class__, packet.get_header().pkt_type)) + + def from_index(self, nodes): + """Initialize this node's indexes to block_id references""" + pass + + def info_response(self, extended_info): + """Generate a node info response MgmtOp""" + return MgmtOp( + op_payload=MgmtOpNodeInfo(self.device_id, self.get_type(), + self.node_inst, extended_info), + op_code=MgmtOpCode.INFO_RESP + ) + +class XportNode(Node): + """Represents an Xport node + + When an Advertise Management op is received, the address and + src_epid of the packet is placed in self.addr_map + """ + def __init__(self, node_inst): + super().__init__(node_inst) + self.downstream = None + self.addr_map = {} + + def get_type(self): + return NodeType.XPORT + + def _handle_mgmt_packet(self, packet, addr, **kwargs): + send_upstream = False + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + packet.set_payload(payload) + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + payload.get_hop(0).add_op(self.info_response(0)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.ADVERTISE: + self.log.info("Advertise: {} | EPID:{} -> EPID:{}" + .format(self.get_id(), payload.src_epid, + packet.get_header().dst_epid)) + self.log.info("addr_map updated: EPID:{} -> {}".format(payload.src_epid, addr)) + self.addr_map[payload.src_epid] = addr + else: + raise NotImplementedError(op.op_code) + self.log.trace("Xport {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + else: + return self.downstream + + def _handle_default_packet(self, packet, **kwargs): + return self.downstream + +class XbarNode(Node): + """Represents a crossbar node + + self.routing_table stores a mapping of dst_epids to xbar ports + + port numbers start from 0. xports are addressed first + stream ep ports are then addressed where + the index of the first ep = len(xport_ports) + + NOTE: UHD inteprets the node_inst of a xbar as the port which + it is connected to. This differs from its handling of node_inst + for other types of nodes. + """ + def __init__(self, node_inst, ports, xport_ports): + super().__init__(node_inst) + self.nports = len(ports) + len(xport_ports) + self.nports_xport = len(xport_ports) + self.ports = xport_ports + ports + self.routing_table = {} + + def get_type(self): + return NodeType.XBAR + + def _handle_mgmt_packet(self, packet, **kwargs): + send_upstream = False + destination = None + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + payload.get_hop(0).add_op(self.info_response( + (self.nports_xport << 8) | self.nports)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.ADVERTISE: + self.log.info("Advertise: {}".format(self.get_id())) + elif op.op_code == MgmtOpCode.CFG_WR_REQ: + cfg = op.get_op_payload() + cfg = MgmtOpCfg.parse(cfg) + self.routing_table[cfg.addr] = cfg.data + self.log.debug("Xbar {} routing changed: {}" + .format(self.node_inst, self.routing_table)) + elif op.op_code == MgmtOpCode.SEL_DEST: + cfg = op.get_op_payload() + cfg = MgmtOpSelDest.parse(cfg) + dest_port = cfg.dest + destination = self.ports[dest_port] + else: + raise NotImplementedError(op.op_code) + self.log.trace("Xbar {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + elif destination is not None: + return destination + + def _handle_default_packet(self, packet, **kwargs): + dst_epid = packet.get_header().dst_epid + if dst_epid not in self.routing_table: + raise RuntimeError("Xbar no destination for packet (dst_epid: {})".format(dst_epid)) + return self.ports[self.routing_table[dst_epid]] + + def from_index(self, nodes): + """This iterates through the list of nodes and sets the + reference in self.ports to the node's id + """ + for i in range(len(self.ports)): + nodes_index = self.ports[i] + node = nodes[nodes_index] + self.ports[i] = node.get_id() + if node.__class__ is XportNode: + node.downstream = self.get_id() + elif node.__class__ is StreamEndpointNode: + node.upstream = self.get_id() + +class StreamEndpointNode(Node): + """Represents a Stream endpoint node + + This class contains a StreamEpRegs object. To clarify, management + packets access these registers, while control packets access the + registers of the noc_blocks which are held in the RFNoCGraph and + passed into handle_packet as the regs parameter + """ + def __init__(self, node_inst): + super().__init__(node_inst) + self.epid = node_inst + self.dst_epid = None + self.upstream = None + self.sep_config = None + # These 4 aren't configurable right now + self.has_data = True + self.has_ctrl = True + self.input_ports = 2 + self.output_ports = 2 + self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid, + self.status_callback_out, self.status_callback_in, 4, 4 * 8000) + + def status_callback_out(self, status): + """Called by the ep_regs when on a write to the + REG_OSTRM_CTRL_STATUS register + """ + if status.cfg_start: + # This only creates a new TxWorker if the cfg_start flag is set + self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) + self.sep_config(self, True) + return STRM_STATUS_FC_ENABLED + + def status_callback_in(self, status): + """Called by the ep_regs on a write to the + REG_OSTRM_CTRL_STATUS register + """ + # This always triggers the graph to create a new RxWorker + self.sep_config(self, False) + return STRM_STATUS_FC_ENABLED + + def graph_init(self, log, device_id, sep_config, **kwargs): + super().graph_init(log, device_id) + self.ep_regs.log = log + self.sep_config = sep_config + + def get_type(self): + return NodeType.STRM_EP + + def get_epid(self): + return self.epid + + def set_epid(self, epid): + self.epid = epid + + def set_dst_epid(self, dst_epid): + self.dst_epid = dst_epid + + def _handle_mgmt_packet(self, packet, regs, **kwargs): + send_upstream = False + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + ext_info = 0 + ext_info |= (1 if self.has_ctrl else 0) << 0 + ext_info |= (1 if self.has_data else 0) << 1 + ext_info |= (self.input_ports & 0x3F) << 2 + ext_info |= (self.output_ports & 0x3F) << 8 + payload.get_hop(0).add_op(self.info_response(ext_info)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.CFG_RD_REQ: + request = MgmtOpCfg.parse(op.get_op_payload()) + value = self.ep_regs.read(request.addr) + payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP, + MgmtOpCfg(request.addr, value))) + elif op.op_code == MgmtOpCode.CFG_WR_REQ: + request = MgmtOpCfg.parse(op.get_op_payload()) + self.ep_regs.write(request.addr, request.data) + else: + raise NotImplementedError("op_code {} is not implemented for " + "StreamEndpointNode".format(op.op_code)) + self.log.trace("Stream Endpoint {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + + def _handle_ctrl_packet(self, packet, regs, **kwargs): + payload = packet.get_payload_ctrl() + _swap_src_dst(packet, payload) + if payload.status != CtrlStatus.OKAY: + raise RuntimeError("Control Status not OK: {}".format(payload.status)) + if payload.op_code == CtrlOpCode.READ: + payload.is_ack = True + payload.set_data([regs.read(payload.address)]) + elif payload.op_code == CtrlOpCode.WRITE: + payload.is_ack = True + regs.write(payload.address, payload.get_data()[0]) + else: + raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code)) + packet.set_payload(payload) + return RETURN_TO_SENDER + +class RFNoCGraph: + """This class holds all of the nodes of the NoC core and the Noc + blocks. + + It serves as an interface between the ChdrSniffer and the + individual blocks/nodes. + """ + def __init__(self, graph_list, log, device_id, create_tx_func, + stop_tx_func, send_strc_func, create_rx_func): + self.log = log.getChild("Graph") + self.send_strc = send_strc_func + self.device_id = device_id + self.stream_spec = StreamSpec() + self.create_tx = create_tx_func + self.stop_tx = stop_tx_func + self.create_rx = create_rx_func + num_stream_ep = 0 + for node in graph_list: + if node.__class__ is StreamEndpointNode: + num_stream_ep += 1 + node.graph_init(self.log, device_id, sep_config=self.prepare_stream_ep) + # These must be done sequentially so that device_id is initialized on all nodes + # before from_index is called on any node + for node in graph_list: + node.from_index(graph_list) + self.graph_map = {node.get_id(): node + for node in graph_list} + # For now, just use one radio block and hardcode it to the first stream endpoint + radio = NocBlock(1 << 16, 2, 2, 512, 1, 0x12AD1000, 16) + adj_list = [ + (StreamEndpointPort(0, 0), NocBlockPort(0, 0)), + (StreamEndpointPort(0, 1), NocBlockPort(0, 1)), + (NocBlockPort(0, 0), StreamEndpointPort(0, 0)), + (NocBlockPort(0, 1), StreamEndpointPort(0, 1)) + ] + self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], num_stream_ep, 1, 0xE320, + adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd, + self.radio_tx_stop) + + def radio_tx_cmd(self, sep_block_id): + """Triggers the creation of a TxWorker in the ChdrSniffer using + the current stream_spec. + + This method transforms the sep_block_id into an epid useable by + the transmit code + """ + # TODO: Use the port + sep_blk, sep_port = sep_block_id + # The NoC Block index for stream endpoints is the inst + 1 + # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map() + sep_inst = sep_blk - 1 + sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst) + stream_ep = self.graph_map[sep_id] + self.stream_spec.dst_epid = stream_ep.dst_epid + self.stream_spec.addr = self.dst_to_addr(stream_ep) + self.log.info("Streaming with StreamSpec:") + self.log.info(str(self.stream_spec)) + self.create_tx(stream_ep.epid, self.stream_spec) + self.stream_spec = StreamSpec() + + def radio_tx_stop(self, sep_block_id): + """Triggers the destuction of a TxWorker in the ChdrSniffer + + This method transforms the sep_block_id into an epid useable by + the transmit code + """ + sep_blk, sep_port = sep_block_id + sep_inst = sep_blk - 1 + # The NoC Block index for stream endpoints is the inst + 1 + # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map() + sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst) + src_epid = self.graph_map[sep_id].epid + self.stop_tx(src_epid) + + def get_device_id(self): + return self.device_id + + def prepare_stream_ep(self, stream_ep, is_tx): + """This is called by a stream_ep when it receives a status update + + Depending on whether it was a tx or rx status, it will either + trigger an strc packet or an rx stream + """ + if is_tx: + addr = self.dst_to_addr(stream_ep) + self.send_strc(stream_ep, addr) + else: + self.create_rx(stream_ep.epid) + + def change_spp(self, spp): + self.stream_spec.packet_samples = spp + + def find_ep_by_id(self, epid): + """Find a Stream Endpoint which identifies with epid""" + for node in self.graph_map.values(): + if node.__class__ is StreamEndpointNode: + if node.epid == epid: + return node + + # Fixme: This doesn't support intra-device connections + # i.e. connecting nodes by connecting two internal stream endpoints + # + # That would require looking at the adjacency list if we end up at + # another stream endpoint instead of an xport + def dst_to_addr(self, src_ep): + """This function traverses backwards through the node graph, + starting from src_ep and taking the path traveled by a packet + heading for src_ep.dst_epid. When it encounters an xport, it + returns the address associated with the dst_epid in the xport's + addr_map + """ + current_node = src_ep + dst_epid = src_ep.dst_epid + while current_node.__class__ != XportNode: + if current_node.__class__ == StreamEndpointNode: + current_node = self.graph_map[current_node.upstream] + else: + # current_node is a xbar + port = current_node.routing_table[dst_epid] + upstream_id = current_node.ports[port] + current_node = self.graph_map[upstream_id] + return current_node.addr_map[dst_epid] + + def handle_packet(self, packet, xport_input, addr): + """Given a chdr_packet, the id of an xport node to serve as an + entry point, and a source address, send the packet through the + node graph. + """ + node_id = xport_input + response_packet = None + while node_id is not None: + if node_id[1] == NodeType.RTS: + response_packet = packet + break + node = self.graph_map[node_id] + # If the node returns a value, it is the node id of the + # node the packet should be passed to next + # or RETURN_TO_SENDER + node_id = node.handle_packet(packet, regs=self.regs, addr=addr) + return response_packet + + def get_stream_spec(self): + return self.stream_spec |