diff options
Diffstat (limited to 'mpm/python')
-rw-r--r-- | mpm/python/usrp_mpm/periph_manager/sim.py | 13 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/CMakeLists.txt | 4 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_sniffer.py | 107 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/noc_block_regs.py | 312 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/rfnoc_graph.py | 554 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/stream_ep_regs.py | 129 |
6 files changed, 1114 insertions, 5 deletions
diff --git a/mpm/python/usrp_mpm/periph_manager/sim.py b/mpm/python/usrp_mpm/periph_manager/sim.py index 7c8baeb8f..7f9a610a7 100644 --- a/mpm/python/usrp_mpm/periph_manager/sim.py +++ b/mpm/python/usrp_mpm/periph_manager/sim.py @@ -16,6 +16,7 @@ from usrp_mpm.mpmlog import get_logger from usrp_mpm.rpc_server import no_claim from usrp_mpm.periph_manager import PeriphManagerBase from usrp_mpm.simulator.sim_dboard_catalina import SimulatedCatalinaDboard +from usrp_mpm.simulator.chdr_sniffer import ChdrSniffer CLOCK_SOURCE_INTERNAL = "internal" @@ -67,8 +68,6 @@ class sim(PeriphManagerBase): """ ######################################################################### # Overridables - # - # See PeriphManagerBase for documentation on these fields ######################################################################### description = "E320-Series Device - SIMULATED" pids = {0xE320: 'e320'} @@ -84,6 +83,8 @@ class sim(PeriphManagerBase): super().__init__() self.device_id = 1 + self.chdr_sniffer = ChdrSniffer(self.log, args) + # Unlike the real hardware drivers, if there is an exception here, # we just crash. No use missing an error when testing. self._init_peripherals(args) @@ -91,8 +92,9 @@ class sim(PeriphManagerBase): if not args.get('skip_boot_init', False): self.init(args) - def _simulator_frequency(self, freq): - self.log.debug("Setting Simulator Sample Frequency to {}".format(freq)) + def _simulator_sample_rate(self, freq): + self.log.debug("Setting Simulator Sample Rate to {}".format(freq)) + self.chdr_endpoint.set_sample_rate(freq) @classmethod def generate_device_info(cls, eeprom_md, mboard_info, dboard_infos): @@ -148,7 +150,8 @@ class sim(PeriphManagerBase): self.log.debug("Device info: {}".format(self.device_info)) def _init_dboards(self, dboard_infos, override_dboard_pids, default_args): - self.dboards.append(SimulatedCatalinaDboard(E320_DBOARD_SLOT_IDX, self._simulator_frequency)) + self.dboards.append(SimulatedCatalinaDboard( + E320_DBOARD_SLOT_IDX, self._simulator_sample_rate)) self.log.info("Found %d daughterboard(s).", len(self.dboards)) ########################################################################### diff --git a/mpm/python/usrp_mpm/simulator/CMakeLists.txt b/mpm/python/usrp_mpm/simulator/CMakeLists.txt index 6cc4ee441..82fcbd801 100644 --- a/mpm/python/usrp_mpm/simulator/CMakeLists.txt +++ b/mpm/python/usrp_mpm/simulator/CMakeLists.txt @@ -13,6 +13,10 @@ set(USRP_MPM_SIMULATOR_FILES ${CMAKE_CURRENT_SOURCE_DIR}/__init__.py ${CMAKE_CURRENT_SOURCE_DIR}/sim_dboard.py ${CMAKE_CURRENT_SOURCE_DIR}/sim_dboard_catalina.py + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_sniffer.py + ${CMAKE_CURRENT_SOURCE_DIR}/noc_block_regs.py + ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_graph.py + ${CMAKE_CURRENT_SOURCE_DIR}/stream_ep_regs.py ) list(APPEND USRP_MPM_FILES ${USRP_MPM_SIMULATOR_FILES}) set(USRP_MPM_FILES ${USRP_MPM_FILES} PARENT_SCOPE) diff --git a/mpm/python/usrp_mpm/simulator/chdr_sniffer.py b/mpm/python/usrp_mpm/simulator/chdr_sniffer.py new file mode 100644 index 000000000..8ff467cf2 --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/chdr_sniffer.py @@ -0,0 +1,107 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module houses the ChdrSniffer class, which handles networking, +packet dispatch, and the nitty gritty of spinning up rx and tx workers. + +Note: Rx and Tx are reversed when compared to their definitions in the +UHD radio_control_impl.cpp file. Rx is when samples are coming to the +simulator. Tx is when samples are being sent by the simulator. + +TODO: This class is run based on threads for development simplicity. If +more throughput is desired, it should be rewritten to use processes +instead. This allows the socket workers to truly work in parallel. +Python threads are limited by holding the GIL while they are executing. +""" + +from threading import Thread +import socket +from uhd.chdr import ChdrPacket, ChdrWidth, PacketType +from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType + +CHDR_W = ChdrWidth.W64 + +class ChdrSniffer: + """This class is created by the sim periph_manager + It is responsible for opening sockets, dispatching all chdr packet + traffic to the appropriate destination, and responding to said + traffic. + + The extra_args parameter is passed in from the periph_manager, and + coresponds to the --default_args flag of usrp_hwd.py on the + command line + """ + def __init__(self, log, extra_args): + self.log = log.getChild("ChdrSniffer") + self.thread = Thread(target=self.socket_worker, daemon=True) + self.thread.start() + self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx, + self.end_tx, self.send_strc, self.begin_rx) + self.xport_map = {} + + def set_sample_rate(self, rate): + """Set the sample_rate of the next tx_stream. + + This method is called by the daughterboard. It coresponds to + sim_dboard.py:sim_db#set_catalina_clock_rate() + """ + self.graph.get_stream_spec().sample_rate = rate + + def get_default_nodes(self): + """Get a sensible NoC Core setup. This is the simplest + functional layout. It has one of each required component. + """ + nodes = [ + XportNode(0), + XbarNode(0, [2], [0]), + StreamEndpointNode(0) + ] + return nodes + + def send_strc(self, stream_ep, addr): + pass # TODO: currently not implemented + + def begin_tx(self, src_epid, stream_spec): + pass # TODO: currently not implemented + + def end_tx(self, src_epid): + pass # TODO: currently not implemented + + def begin_rx(self, dst_epid): + pass # TODO: currently not implemented + + def socket_worker(self): + """This is the method that runs in a background thread. It + blocks on the CHDR socket and processes packets as they come + in. + """ + self.log.info("Starting ChdrSniffer Thread") + main_sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM) + main_sock.bind(("0.0.0.0", 49153)) + + while True: + # This allows us to block on multiple sockets at the same time + buffer = bytearray(8192) # Max MTU + # received Data over socket + n_bytes, sender = main_sock.recvfrom_into(buffer) + self.log.trace("received {} bytes of data from {}" + .format(n_bytes, sender)) + try: + packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes]) + self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload())) + entry_xport = (1, NodeType.XPORT, 0) + pkt_type = packet.get_header().pkt_type + response = self.graph.handle_packet(packet, entry_xport, sender) + + if response is not None: + data = response.serialize() + self.log.trace("Returning Packet: {}" + .format(packet.to_string_with_payload())) + main_sock.sendto(bytes(data), sender) + except BaseException as ex: + self.log.warning("Unable to decode packet: {}" + .format(ex)) + raise ex diff --git a/mpm/python/usrp_mpm/simulator/noc_block_regs.py b/mpm/python/usrp_mpm/simulator/noc_block_regs.py new file mode 100644 index 000000000..b488ed43f --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/noc_block_regs.py @@ -0,0 +1,312 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +# Read Register Addresses +#! Register address of the protocol version +PROTOVER_ADDR = 0 * 4 +#! Register address of the port information +PORT_CNT_ADDR = 1 * 4 +#! Register address of the edge information +EDGE_CNT_ADDR = 2 * 4 +#! Register address of the device information +DEVICE_INFO_ADDR = 3 * 4 +#! Register address of the controlport information +CTRLPORT_CNT_ADDR = 4 * 4 +#! (Write) Register address of the flush and reset controls +FLUSH_RESET_ADDR = 1 * 4 + +#! Base address of the adjacency list +ADJACENCY_BASE_ADDR = 0x10000 +#! Each port is allocated this many registers in the backend register space +REGS_PER_PORT = 16 + +REG_RX_MAX_WORDS_PER_PKT = 0x28 +REG_RX_CMD_NUM_WORDS_HI = 0x1C +REG_RX_CMD_NUM_WORDS_LO = 0x18 +REG_RX_CMD = 0x14 + +RX_CMD_CONTINUOUS = 0x2 +RX_CMD_STOP = 0x0 +RX_CMD_FINITE = 0x1 + +RADIO_BASE_ADDR = 0x1000 +REG_CHAN_OFFSET = 128 # 0x80 + + +class StreamEndpointPort: + """Represents a port on a Stream Endpoint + + inst should be the same as the stream endpoint's node_inst + """ + def __init__(self, inst, port): + self.inst = inst + self.port = port + def to_tuple(self, num_stream_ep): + # The entry in an adjacency list is (blk_id, block_port) + # where blk_id for stream endpoints starts at 1 + # and Noc Blocks are addressed after the last stream endpoint + # See rfnoc_graph.cpp + return (1 + self.inst, self.port) + +class NocBlockPort: + """Represents a port on a Noc Block""" + def __init__(self, inst, port): + self.inst = inst + self.port = port + + def to_tuple(self, num_stream_ep): + # The entry in an adjacency list is (blk_id, block_port) + # where blk_id for stream endpoints starts at 1 + # and Noc Blocks are addressed after the last stream endpoint + # See rfnoc_graph.cpp + return (1 + num_stream_ep + self.inst, self.port) + +class NocBlock: + """Represents a NocBlock + + see client_zero.hpp:block_config_info + + NOTE: The mtu in bytes is calculated by (2**data_mtu * CHDR_W) + """ + def __init__(self, protover, num_inputs, num_outputs, ctrl_fifo_size, + ctrl_max_async_msgs, noc_id, data_mtu): + self.protover = protover + self.num_inputs = num_inputs + self.num_outputs = num_outputs + self.ctrl_fifo_size = ctrl_fifo_size + self.ctrl_max_async_msgs = ctrl_max_async_msgs + self.noc_id = noc_id + self.data_mtu = data_mtu + + def read_reg(self, reg_num): + # See client_zero.cpp + if reg_num == 0: + return self.read_config() + elif reg_num == 1: + return self.read_noc_id() + elif reg_num == 2: + return self.read_data() + else: + raise RuntimeError("NocBlock doesn't have a register #{}".format(reg_num)) + + def read_config(self): + return (self.protover & 0x3F) | \ + ((self.num_inputs & 0x3F) << 6) | \ + ((self.num_outputs & 0x3F) << 12) | \ + ((self.ctrl_fifo_size & 0x3F) << 18) | \ + ((self.ctrl_max_async_msgs & 0xFF) << 24) + + def read_data(self): + return (self.data_mtu & 0x3F) << 2 | \ + (1 << 1) # Permanently Set flush done + + def read_noc_id(self): + return self.noc_id & 0xFFFFFFFF + +class NocBlockRegs: + """Represents registers associated whith a group of NoCBlocks + roughly similar to UHD's client_zero + + NOTE: Many write operations are currently unimplemented and simply no-op + """ + def __init__(self, log, protover, has_xbar, num_xports, blocks, num_stream_ep, num_ctrl_ep, + device_type, adjacency_list, sample_width, samples_per_cycle, get_stream_spec, + create_tx_stream, stop_tx_stream): + """ Args: + protover -> FPGA Compat number + has_xbar -> Is there a chdr xbar? + num_xports -> how many xports + blocks -> list of NocBlock objects + num_stream_ep -> how many stream endpoints + num_ctrl_ep -> how many ctrl endpoints + device_type -> the device type (see defaults.hpp:device_type_t in UHD) + adjacency_list -> List of (Port, Port) tuples where + Port is either StreamEndpointPort or NocBlockPort + sample_width -> Sample width of radio + samples_per_cycle -> Samples produced by a radio cycle + get_stream_spec -> Callback which returns the current stream spec + create_tx_stream -> Callback which takes a block_index and starts a tx stream + stop_tx_stream -> Callback which takes a block_index and stops a tx stream + """ + self.log = log.getChild("Regs") + self.protover = protover + self.has_xbar = has_xbar + self.num_xports = num_xports + self.blocks = blocks + self.num_blocks = len(blocks) + self.num_stream_ep = num_stream_ep + self.num_ctrl_ep = num_ctrl_ep + self.device_type = device_type + self.adjacency_list = [(src_blk.to_tuple(num_stream_ep), dst_blk.to_tuple(num_stream_ep)) + for src_blk, dst_blk in adjacency_list] + self.adjacency_list_reg = NocBlockRegs._parse_adjacency_list(self.adjacency_list) + self.sample_width = sample_width + self.samples_per_cycle = samples_per_cycle + self.radio_reg = {} + self.get_stream_spec = get_stream_spec + self.create_tx_stream = create_tx_stream + self.stop_tx_stream = stop_tx_stream + + def read(self, addr): + # See client_zero.cpp + if addr == PROTOVER_ADDR: + return self.read_protover() + elif addr == PORT_CNT_ADDR: + return self.read_port_cnt() + elif addr == EDGE_CNT_ADDR: + return self.read_edge_cnt() + elif addr == DEVICE_INFO_ADDR: + return self.read_device_info() + elif addr == CTRLPORT_CNT_ADDR: + return self.read_ctrlport_cnt() + elif addr >= 0x40 and addr < 0x1000: + return self.read_port_reg(addr) + # See radio_control_impl.cpp + elif addr >= 0x1000 and addr < 0x10000: + return self.read_radio(addr) + # See client_zero.cpp + elif addr >= 0x10000: + return self.read_adjacency_list(addr) + else: + raise RuntimeError("Unsupported register addr: 0x{:08X}".format(addr)) + + def read_radio(self, addr): + if addr == 0x1000: + raise NotImplementedError() # TODO: This should be REG_COMPAT + elif addr == 0x1004: + return self.read_radio_width() + else: + offset = addr - 0x1000 + chan = offset // 0x80 + radio_offset = offset % 0x80 + if radio_offset == 0x40: + return self.radio_reg + elif radio_offset == 0x3C: + return self.radio_reg + else: + raise NotImplementedError("Radio addr 0x{:08X} not implemented".format(addr)) + + def write_radio(self, addr, value): + """Write a value to radio registers + + See radio_control_impl.cpp + """ + offset = addr - 0x1000 + assert offset >= 0 + chan = offset // 0x80 + if chan > 0: # For now, just operate as if there is one channel + self.log.warn("Channel {} not suported".format(chan)) + return + reg = offset % 0x80 + if reg == REG_RX_MAX_WORDS_PER_PKT: + self.get_stream_spec().packet_samples = value + elif reg == REG_RX_CMD_NUM_WORDS_HI: + self.get_stream_spec().set_num_words_hi(value) + elif reg == REG_RX_CMD_NUM_WORDS_LO: + self.get_stream_spec().set_num_words_lo(value) + elif reg == REG_RX_CMD: + if value & (1 << 31) != 0: + self.log.warn("Timed Streams are not supported. Starting immediately") + value = value & ~(1 << 31) # Clear the flag + if value == RX_CMD_STOP: + sep_block_id = self.resolve_ep_towards_outputs((self.get_radio_port(), chan)) + self.stop_tx_stream(sep_block_id) + return + elif value == RX_CMD_CONTINUOUS: + self.get_stream_spec().is_continuous = True + elif value == RX_CMD_FINITE: + self.get_stream_spec().is_continuous = False + else: + raise RuntimeError("Unknown Stream RX_CMD: {:08X}".format(value)) + sep_block_id = self.resolve_ep_towards_outputs((self.get_radio_port(), chan)) + self.create_tx_stream(sep_block_id) + + def resolve_ep_towards_outputs(self, block_id): + """Follow dataflow downstream through the adjacency list until + a stream_endpoint is encountered + """ + for src_blk, dst_blk in self.adjacency_list: + if src_blk == block_id: + dst_index, dst_port = dst_blk + if dst_index <= self.num_stream_ep: + return dst_blk + else: + return self.resolve_ep_towards_outputs(dst_blk) + + def get_radio_port(self): + """Returns the block_id of the radio block""" + radio_noc_id = 0x12AD1000 + for i, block in enumerate(self.blocks): + if block.noc_id == radio_noc_id: + return i + 1 + self.num_stream_ep + + # This is the FPGA compat number + def read_protover(self): + return 0xFFFF & self.protover + + def read_port_cnt(self): + return (self.num_stream_ep & 0x3FF) | \ + ((self.num_blocks & 0x3FF) << 10) | \ + ((self.num_xports & 0x3FF) << 20) | \ + ((1 if self.has_xbar else 0) << 31) + + def read_edge_cnt(self): + return len(self.adjacency_list) + + def read_device_info(self): + return (self.device_type & 0xFFFF) << 16 + + def read_ctrlport_cnt(self): + return (self.num_ctrl_ep & 0x3FF) + + def read_adjacency_list(self, addr): + offset = addr & 0xFFFF + if offset == 0: + self.log.debug("Adjacency List has {} entries".format(len(self.adjacency_list_reg))) + return len(self.adjacency_list_reg) + else: + assert(offset % 4 == 0) + index = (offset // 4) - 1 + return self.adjacency_list_reg[index] + + def write(self, addr, value): + if addr == 0x1040 or addr == 0x10C0: + self.log.trace("Storing value: 0x:{:08X} to self.radio_reg for data loopback test".format(value)) + self.radio_reg = value + # assuming 2 channels, out of bounds is + # BASE + 2 * CHAN_OFFSET = 0x1000 + 2 * 0x80 = 0x1100 + elif 0x1000 <= addr < 0x1100: + self.write_radio(addr, value) + + def read_port_reg(self, addr): + port = addr // 0x40 + if port < self.num_stream_ep: + raise NotImplementedError() + else: + block = port - self.num_stream_ep - 1 + offset = (addr % 0x40) // 4 + return self.blocks[block].read_reg(offset) + + def read_radio_width(self): + return (self.samples_per_cycle & 0xFFFF) | \ + ((self.sample_width & 0xFFFF) << 16) + + @staticmethod + def _parse_adjacency_list(adj_list): + """Serialize an adjacency list from the form of + [((src_blk, src_port), (dst_blk, dst_port))] + + See client_zero.cpp:client_zero#_get_adjacency_list() + """ + def pack(blocks): + src_blk, src_port = blocks[0] + dst_blk, dst_port = blocks[1] + return ((src_blk & 0x3FF) << 22) | \ + ((src_port & 0x3F) << 16) | \ + ((dst_blk & 0x3FF) << 6) | \ + ((dst_port & 0x3F) << 0) + return [pack(blocks) for blocks in adj_list] + diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py new file mode 100644 index 000000000..42210ab6e --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py @@ -0,0 +1,554 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module includes all of the components necessary to simulate the +configuration of a NoC Core and NoC Blocks. + +This module handles and responds to Management and Ctrl Packets. It +also instantiates the registers and acts as an interface between +the chdr packets on the network and the registers. +""" +from enum import IntEnum +from uhd.chdr import PacketType, MgmtOp, MgmtOpCode, MgmtOpNodeInfo, \ + CtrlStatus, CtrlOpCode, MgmtOpCfg, MgmtOpSelDest +from .noc_block_regs import NocBlockRegs, NocBlock, StreamEndpointPort, NocBlockPort +from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED + +class StreamSpec: + """This class carries the configuration parameters of a Tx stream. + + total_samples, is_continuous, and packet_samples come from the + radio registers (noc_block_regs.py) + + sample_rate comes from an rpc to the daughterboard (through the + set_sample_rate method in chdr_sniffer.py) + + dst_epid comes from the source stream_ep + + addr comes from the xport passed through when routing to dst_epid + """ + LOW_MASK = 0xFFFFFFFF + HIGH_MASK = (0xFFFFFFFF) << 32 + def __init__(self): + self.total_samples = 0 + self.is_continuous = True + self.packet_samples = None + self.sample_rate = None + self.dst_epid = None + self.addr = None + + def set_num_words_lo(self, low): + """Set the low 32 bits of the total_samples field""" + self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \ + | (low & StreamSpec.LOW_MASK) + + def set_num_words_hi(self, high): + """Set the high 32 bits of the total_samples field""" + self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \ + | ((high & StreamSpec.LOW_MASK) << 32) + + def seconds_per_packet(self): + """Calculates how many seconds should be between each packet + transmit + """ + assert self.packet_samples != 0 + assert self.sample_rate != 0 + return self.packet_samples / self.sample_rate + + def __str__(self): + return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \ + " sample_rate: {}, dst_epid: {}, addr: {}}}" \ + .format(self.total_samples, self.is_continuous, self.packet_samples, + self.sample_rate, self.dst_epid, self.addr) + +def to_iter(index_func, length): + """Allows looping over an indexed object in a for-each loop""" + for i in range(length): + yield index_func(i) + +def _swap_src_dst(packet, payload): + """Swap the src_epid and the dst_epid of a packet""" + header = packet.get_header() + our_epid = header.dst_epid + header.dst_epid = payload.src_epid + payload.src_epid = our_epid + packet.set_header(header) + +class NodeType(IntEnum): + """The type of a node in a NoC Core + + RTS is a magic value used to determine when to return a packet to + the sender + """ + INVALID = 0 + XBAR = 1 + STRM_EP = 2 + XPORT = 3 + RTS = 0xFF + +RETURN_TO_SENDER = (0, NodeType.RTS, 0) + +class Node: + """Represents a node in a RFNoCGraph + + These objects are usually constructed by the caller of RFNoC Graph. + Initially, they have references to other nodes as indexes of the + list of nodes. The graph calls graph_init and from_index so this + node can initialize their references to node_ids instead. + + Subclasses can override _handle_<packet_type>_packet methods, and + _handle_default_packet is provided, which will receive packets whose + specific methods are not overridden + """ + def __init__(self, node_inst): + """node_inst should start at 0 and increase for every Node of + the same type that is constructed + """ + self.device_id = None + self.node_inst = node_inst + self.log = None + + def graph_init(self, log, device_id, **kwargs): + """This method is called to initialize the Node Graph""" + self.device_id = device_id + self.log = log + + def get_id(self): + """Return the NodeID (device_id, node_type, node_inst)""" + return (self.device_id, self.get_type(), self.node_inst) + + def get_type(self): + """Returns the NodeType of this node""" + raise NotImplementedError + + def handle_packet(self, packet, **kwargs): + """Processes a packet + + The return value should be a node_id or RETURN_TO_SENDER + """ + packet_type = packet.get_header().pkt_type + next_node = None + if packet_type == PacketType.MGMT: + next_node = self._handle_mgmt_packet(packet, **kwargs) + elif packet_type == PacketType.CTRL: + next_node = self._handle_ctrl_packet(packet, **kwargs) + elif packet_type in (PacketType.DATA_W_TS, PacketType.DATA_NO_TS): + next_node = self._handle_data_packet(packet, **kwargs) + elif packet_type == PacketType.STRS: + next_node = self._handle_strs_packet(packet, **kwargs) + elif packet_type == PacketType.STRC: + next_node = self._handle_strc_packet(packet, **kwargs) + else: + raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type)) + if next_node == NotImplemented: + next_node = self._handle_default_packet(packet, **kwargs) + return next_node + + def _handle_mgmt_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_ctrl_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_data_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_strc_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_strs_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_default_packet(self, packet, **kwargs): + raise RuntimeError("{} has no operation defined for a {} packet" + .format(self.__class__, packet.get_header().pkt_type)) + + def from_index(self, nodes): + """Initialize this node's indexes to block_id references""" + pass + + def info_response(self, extended_info): + """Generate a node info response MgmtOp""" + return MgmtOp( + op_payload=MgmtOpNodeInfo(self.device_id, self.get_type(), + self.node_inst, extended_info), + op_code=MgmtOpCode.INFO_RESP + ) + +class XportNode(Node): + """Represents an Xport node + + When an Advertise Management op is received, the address and + src_epid of the packet is placed in self.addr_map + """ + def __init__(self, node_inst): + super().__init__(node_inst) + self.downstream = None + self.addr_map = {} + + def get_type(self): + return NodeType.XPORT + + def _handle_mgmt_packet(self, packet, addr, **kwargs): + send_upstream = False + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + packet.set_payload(payload) + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + payload.get_hop(0).add_op(self.info_response(0)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.ADVERTISE: + self.log.info("Advertise: {} | EPID:{} -> EPID:{}" + .format(self.get_id(), payload.src_epid, + packet.get_header().dst_epid)) + self.log.info("addr_map updated: EPID:{} -> {}".format(payload.src_epid, addr)) + self.addr_map[payload.src_epid] = addr + else: + raise NotImplementedError(op.op_code) + self.log.trace("Xport {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + else: + return self.downstream + + def _handle_default_packet(self, packet, **kwargs): + return self.downstream + +class XbarNode(Node): + """Represents a crossbar node + + self.routing_table stores a mapping of dst_epids to xbar ports + + port numbers start from 0. xports are addressed first + stream ep ports are then addressed where + the index of the first ep = len(xport_ports) + + NOTE: UHD inteprets the node_inst of a xbar as the port which + it is connected to. This differs from its handling of node_inst + for other types of nodes. + """ + def __init__(self, node_inst, ports, xport_ports): + super().__init__(node_inst) + self.nports = len(ports) + len(xport_ports) + self.nports_xport = len(xport_ports) + self.ports = xport_ports + ports + self.routing_table = {} + + def get_type(self): + return NodeType.XBAR + + def _handle_mgmt_packet(self, packet, **kwargs): + send_upstream = False + destination = None + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + payload.get_hop(0).add_op(self.info_response( + (self.nports_xport << 8) | self.nports)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.ADVERTISE: + self.log.info("Advertise: {}".format(self.get_id())) + elif op.op_code == MgmtOpCode.CFG_WR_REQ: + cfg = op.get_op_payload() + cfg = MgmtOpCfg.parse(cfg) + self.routing_table[cfg.addr] = cfg.data + self.log.debug("Xbar {} routing changed: {}" + .format(self.node_inst, self.routing_table)) + elif op.op_code == MgmtOpCode.SEL_DEST: + cfg = op.get_op_payload() + cfg = MgmtOpSelDest.parse(cfg) + dest_port = cfg.dest + destination = self.ports[dest_port] + else: + raise NotImplementedError(op.op_code) + self.log.trace("Xbar {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + elif destination is not None: + return destination + + def _handle_default_packet(self, packet, **kwargs): + dst_epid = packet.get_header().dst_epid + if dst_epid not in self.routing_table: + raise RuntimeError("Xbar no destination for packet (dst_epid: {})".format(dst_epid)) + return self.ports[self.routing_table[dst_epid]] + + def from_index(self, nodes): + """This iterates through the list of nodes and sets the + reference in self.ports to the node's id + """ + for i in range(len(self.ports)): + nodes_index = self.ports[i] + node = nodes[nodes_index] + self.ports[i] = node.get_id() + if node.__class__ is XportNode: + node.downstream = self.get_id() + elif node.__class__ is StreamEndpointNode: + node.upstream = self.get_id() + +class StreamEndpointNode(Node): + """Represents a Stream endpoint node + + This class contains a StreamEpRegs object. To clarify, management + packets access these registers, while control packets access the + registers of the noc_blocks which are held in the RFNoCGraph and + passed into handle_packet as the regs parameter + """ + def __init__(self, node_inst): + super().__init__(node_inst) + self.epid = node_inst + self.dst_epid = None + self.upstream = None + self.sep_config = None + # These 4 aren't configurable right now + self.has_data = True + self.has_ctrl = True + self.input_ports = 2 + self.output_ports = 2 + self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid, + self.status_callback_out, self.status_callback_in, 4, 4 * 8000) + + def status_callback_out(self, status): + """Called by the ep_regs when on a write to the + REG_OSTRM_CTRL_STATUS register + """ + if status.cfg_start: + # This only creates a new TxWorker if the cfg_start flag is set + self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) + self.sep_config(self, True) + return STRM_STATUS_FC_ENABLED + + def status_callback_in(self, status): + """Called by the ep_regs on a write to the + REG_OSTRM_CTRL_STATUS register + """ + # This always triggers the graph to create a new RxWorker + self.sep_config(self, False) + return STRM_STATUS_FC_ENABLED + + def graph_init(self, log, device_id, sep_config, **kwargs): + super().graph_init(log, device_id) + self.ep_regs.log = log + self.sep_config = sep_config + + def get_type(self): + return NodeType.STRM_EP + + def get_epid(self): + return self.epid + + def set_epid(self, epid): + self.epid = epid + + def set_dst_epid(self, dst_epid): + self.dst_epid = dst_epid + + def _handle_mgmt_packet(self, packet, regs, **kwargs): + send_upstream = False + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + ext_info = 0 + ext_info |= (1 if self.has_ctrl else 0) << 0 + ext_info |= (1 if self.has_data else 0) << 1 + ext_info |= (self.input_ports & 0x3F) << 2 + ext_info |= (self.output_ports & 0x3F) << 8 + payload.get_hop(0).add_op(self.info_response(ext_info)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.CFG_RD_REQ: + request = MgmtOpCfg.parse(op.get_op_payload()) + value = self.ep_regs.read(request.addr) + payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP, + MgmtOpCfg(request.addr, value))) + elif op.op_code == MgmtOpCode.CFG_WR_REQ: + request = MgmtOpCfg.parse(op.get_op_payload()) + self.ep_regs.write(request.addr, request.data) + else: + raise NotImplementedError("op_code {} is not implemented for " + "StreamEndpointNode".format(op.op_code)) + self.log.trace("Stream Endpoint {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + + def _handle_ctrl_packet(self, packet, regs, **kwargs): + payload = packet.get_payload_ctrl() + _swap_src_dst(packet, payload) + if payload.status != CtrlStatus.OKAY: + raise RuntimeError("Control Status not OK: {}".format(payload.status)) + if payload.op_code == CtrlOpCode.READ: + payload.is_ack = True + payload.set_data([regs.read(payload.address)]) + elif payload.op_code == CtrlOpCode.WRITE: + payload.is_ack = True + regs.write(payload.address, payload.get_data()[0]) + else: + raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code)) + packet.set_payload(payload) + return RETURN_TO_SENDER + +class RFNoCGraph: + """This class holds all of the nodes of the NoC core and the Noc + blocks. + + It serves as an interface between the ChdrSniffer and the + individual blocks/nodes. + """ + def __init__(self, graph_list, log, device_id, create_tx_func, + stop_tx_func, send_strc_func, create_rx_func): + self.log = log.getChild("Graph") + self.send_strc = send_strc_func + self.device_id = device_id + self.stream_spec = StreamSpec() + self.create_tx = create_tx_func + self.stop_tx = stop_tx_func + self.create_rx = create_rx_func + num_stream_ep = 0 + for node in graph_list: + if node.__class__ is StreamEndpointNode: + num_stream_ep += 1 + node.graph_init(self.log, device_id, sep_config=self.prepare_stream_ep) + # These must be done sequentially so that device_id is initialized on all nodes + # before from_index is called on any node + for node in graph_list: + node.from_index(graph_list) + self.graph_map = {node.get_id(): node + for node in graph_list} + # For now, just use one radio block and hardcode it to the first stream endpoint + radio = NocBlock(1 << 16, 2, 2, 512, 1, 0x12AD1000, 16) + adj_list = [ + (StreamEndpointPort(0, 0), NocBlockPort(0, 0)), + (StreamEndpointPort(0, 1), NocBlockPort(0, 1)), + (NocBlockPort(0, 0), StreamEndpointPort(0, 0)), + (NocBlockPort(0, 1), StreamEndpointPort(0, 1)) + ] + self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], num_stream_ep, 1, 0xE320, + adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd, + self.radio_tx_stop) + + def radio_tx_cmd(self, sep_block_id): + """Triggers the creation of a TxWorker in the ChdrSniffer using + the current stream_spec. + + This method transforms the sep_block_id into an epid useable by + the transmit code + """ + # TODO: Use the port + sep_blk, sep_port = sep_block_id + # The NoC Block index for stream endpoints is the inst + 1 + # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map() + sep_inst = sep_blk - 1 + sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst) + stream_ep = self.graph_map[sep_id] + self.stream_spec.dst_epid = stream_ep.dst_epid + self.stream_spec.addr = self.dst_to_addr(stream_ep) + self.log.info("Streaming with StreamSpec:") + self.log.info(str(self.stream_spec)) + self.create_tx(stream_ep.epid, self.stream_spec) + self.stream_spec = StreamSpec() + + def radio_tx_stop(self, sep_block_id): + """Triggers the destuction of a TxWorker in the ChdrSniffer + + This method transforms the sep_block_id into an epid useable by + the transmit code + """ + sep_blk, sep_port = sep_block_id + sep_inst = sep_blk - 1 + # The NoC Block index for stream endpoints is the inst + 1 + # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map() + sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst) + src_epid = self.graph_map[sep_id].epid + self.stop_tx(src_epid) + + def get_device_id(self): + return self.device_id + + def prepare_stream_ep(self, stream_ep, is_tx): + """This is called by a stream_ep when it receives a status update + + Depending on whether it was a tx or rx status, it will either + trigger an strc packet or an rx stream + """ + if is_tx: + addr = self.dst_to_addr(stream_ep) + self.send_strc(stream_ep, addr) + else: + self.create_rx(stream_ep.epid) + + def change_spp(self, spp): + self.stream_spec.packet_samples = spp + + def find_ep_by_id(self, epid): + """Find a Stream Endpoint which identifies with epid""" + for node in self.graph_map.values(): + if node.__class__ is StreamEndpointNode: + if node.epid == epid: + return node + + # Fixme: This doesn't support intra-device connections + # i.e. connecting nodes by connecting two internal stream endpoints + # + # That would require looking at the adjacency list if we end up at + # another stream endpoint instead of an xport + def dst_to_addr(self, src_ep): + """This function traverses backwards through the node graph, + starting from src_ep and taking the path traveled by a packet + heading for src_ep.dst_epid. When it encounters an xport, it + returns the address associated with the dst_epid in the xport's + addr_map + """ + current_node = src_ep + dst_epid = src_ep.dst_epid + while current_node.__class__ != XportNode: + if current_node.__class__ == StreamEndpointNode: + current_node = self.graph_map[current_node.upstream] + else: + # current_node is a xbar + port = current_node.routing_table[dst_epid] + upstream_id = current_node.ports[port] + current_node = self.graph_map[upstream_id] + return current_node.addr_map[dst_epid] + + def handle_packet(self, packet, xport_input, addr): + """Given a chdr_packet, the id of an xport node to serve as an + entry point, and a source address, send the packet through the + node graph. + """ + node_id = xport_input + response_packet = None + while node_id is not None: + if node_id[1] == NodeType.RTS: + response_packet = packet + break + node = self.graph_map[node_id] + # If the node returns a value, it is the node id of the + # node the packet should be passed to next + # or RETURN_TO_SENDER + node_id = node.handle_packet(packet, regs=self.regs, addr=addr) + return response_packet + + def get_stream_spec(self): + return self.stream_spec diff --git a/mpm/python/usrp_mpm/simulator/stream_ep_regs.py b/mpm/python/usrp_mpm/simulator/stream_ep_regs.py new file mode 100644 index 000000000..defed345f --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/stream_ep_regs.py @@ -0,0 +1,129 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module holds the Emulated registers for a Stream Endpoint Node. +One of these objects is instantiated for each Stream Endpoint Node. +""" +from enum import IntEnum + +REG_EPID_SELF = 0x00 # RW +REG_RESET_AND_FLUSH = 0x04 # W +REG_OSTRM_CTRL_STATUS = 0x08 # RW +REG_OSTRM_DST_EPID = 0x0C # W +REG_OSTRM_FC_FREQ_BYTES_LO = 0x10 # W +REG_OSTRM_FC_FREQ_BYTES_HI = 0x14 # W +REG_OSTRM_FC_FREQ_PKTS = 0x18 # W +REG_OSTRM_FC_HEADROOM = 0x1C # W +REG_OSTRM_BUFF_CAP_BYTES_LO = 0x20 # R +REG_OSTRM_BUFF_CAP_BYTES_HI = 0x24 # R +REG_OSTRM_BUFF_CAP_PKTS = 0x28 # R +REG_OSTRM_SEQ_ERR_CNT = 0x2C # R +REG_OSTRM_DATA_ERR_CNT = 0x30 # R +REG_OSTRM_ROUTE_ERR_CNT = 0x34 # R +REG_ISTRM_CTRL_STATUS = 0x38 # RW + +RESET_AND_FLUSH_OSTRM = (1 << 0) +RESET_AND_FLUSH_ISTRM = (1 << 1) +RESET_AND_FLUSH_CTRL = (1 << 2) +RESET_AND_FLUSH_ALL = 0x7 + +STRM_STATUS_FC_ENABLED = 0x80000000 +STRM_STATUS_SETUP_ERR = 0x40000000 +STRM_STATUS_SETUP_PENDING = 0x20000000 + +class SwBuff(IntEnum): + """The size of the elements in a buffer""" + BUFF_U64 = 0 + BUFF_U32 = 1 + BUFF_U16 = 2 + BUFF_U8 = 3 + +class CtrlStatusWord: + """Represents a Control Status Word + + See mgmt_portal:BUILD_CTRL_STATUS_WORD() + """ + def __init__(self, cfg_start, xport_lossy, pyld_buff_fmt, mdata_buff_fmt, byte_swap): + self.cfg_start = cfg_start + self.xport_lossy = xport_lossy + self.pyld_buff_fmt = pyld_buff_fmt + self.mdata_buff_fmt = mdata_buff_fmt + self.byte_swap = byte_swap + + @classmethod + def parse(cls, val): + cfg_start = (val & 1) != 0 + xport_lossy = ((val >> 1) & 1) != 0 + pyld_buff_fmt = SwBuff((val >> 2) & 3) + mdata_buff_fmt = SwBuff((val >> 4) & 3) + byte_swap = ((val >> 6) & 1) != 0 + return cls(cfg_start, xport_lossy, pyld_buff_fmt, mdata_buff_fmt, byte_swap) + + def __str__(self): + return "CtrlStatusWord{{cfg_start: {}, xport_lossy: {}, " \ + "pyld_buff_fmt: {}, mdata_buff_fmt: {}, byte_swap: {}}}" \ + .format(self.cfg_start, self.xport_lossy, + self.pyld_buff_fmt, self.mdata_buff_fmt, self.byte_swap) + +class StreamEpRegs: + """Represents a set of registers associated with a stream endpoint + which can be accessed through management packets + + See mgmt_portal.cpp + """ + def __init__(self, get_epid, set_epid, set_dst_epid, update_status_out, + update_status_in, cap_pkts, cap_bytes): + self.get_epid = get_epid + self.set_epid = set_epid + self.set_dst_epid = set_dst_epid + self.log = None + self.out_ctrl_status = 0 + self.in_ctrl_status = 0 + self.update_status_out = update_status_out + self.update_status_in = update_status_in + self.cap_pkts = cap_pkts + self.cap_bytes = cap_bytes + + def read(self, addr): + if addr == REG_EPID_SELF: + return self.get_epid() + elif addr == REG_OSTRM_CTRL_STATUS: + return self.out_ctrl_status + elif addr == REG_ISTRM_CTRL_STATUS: + return self.in_ctrl_status + elif addr == REG_OSTRM_BUFF_CAP_BYTES_LO: + return self.cap_bytes & 0xFFFFFFFF + elif addr == REG_OSTRM_BUFF_CAP_BYTES_HI: + return (self.cap_bytes >> 32) & 0xFFFFFFFF + elif addr == REG_OSTRM_BUFF_CAP_PKTS: + return self.cap_pkts + else: + raise NotImplementedError("Unable to read addr 0x{:08X} from stream ep regs" + .format(addr)) + + def write(self, addr, val): + if addr == REG_EPID_SELF: + self.log.debug("Setting EPID to {}".format(val)) + self.set_epid(val) + elif addr == REG_OSTRM_CTRL_STATUS: + status = CtrlStatusWord.parse(val) + self.log.debug("Setting EPID Output Stream Ctrl Status: {}".format(status)) + new_status = self.update_status_out(status) + self.out_ctrl_status = new_status if new_status is not None else val + elif addr == REG_RESET_AND_FLUSH: + self.log.trace("Stream EP Regs Reset and Flush") + elif addr == REG_OSTRM_DST_EPID: + self.log.debug("Setting Dest EPID to {}".format(val)) + self.set_dst_epid(val) + elif REG_OSTRM_FC_FREQ_BYTES_LO <= addr <= REG_OSTRM_FC_HEADROOM: + pass # TODO: implement these Flow Control parameters + elif addr == REG_ISTRM_CTRL_STATUS: + status = CtrlStatusWord.parse(val) + self.log.debug("Setting EPID Input Stream Ctrl Status: {}".format(status)) + new_status = self.update_status_in(status) + self.in_ctrl_status = new_status if new_status is not None else val + else: + raise NotImplementedError("Unable to write addr 0x{:08X} from stream ep regs" + .format(addr)) |