diff options
author | Samuel O'Brien <sam.obrien@ni.com> | 2020-07-21 14:20:24 -0500 |
---|---|---|
committer | Aaron Rossetto <aaron.rossetto@ni.com> | 2020-10-08 07:44:12 -0500 |
commit | ee9085a494d6f5030e49f5a47aff6a84008e0852 (patch) | |
tree | 1234c4a6aa5c88f3df290c1586e69e4a91d24f7d | |
parent | f54a22c60a0cbe990c9d3892f4c565d64226196b (diff) | |
download | uhd-ee9085a494d6f5030e49f5a47aff6a84008e0852.tar.gz uhd-ee9085a494d6f5030e49f5a47aff6a84008e0852.tar.bz2 uhd-ee9085a494d6f5030e49f5a47aff6a84008e0852.zip |
sim: Simulator CHDR Parsing and RFNoC Graph
This commit adds a simulated RFNoC Graph to the simulator. It is also
able to process management and control packets which can traverse the
graph and read from simulated registers. Stub callbacks for creating
streams have been provided but are not implemented yet.
Signed-off-by: Samuel O'Brien <sam.obrien@ni.com>
-rw-r--r-- | mpm/python/usrp_mpm/periph_manager/sim.py | 13 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/CMakeLists.txt | 4 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_sniffer.py | 107 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/noc_block_regs.py | 312 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/rfnoc_graph.py | 554 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/stream_ep_regs.py | 129 |
6 files changed, 1114 insertions, 5 deletions
diff --git a/mpm/python/usrp_mpm/periph_manager/sim.py b/mpm/python/usrp_mpm/periph_manager/sim.py index 7c8baeb8f..7f9a610a7 100644 --- a/mpm/python/usrp_mpm/periph_manager/sim.py +++ b/mpm/python/usrp_mpm/periph_manager/sim.py @@ -16,6 +16,7 @@ from usrp_mpm.mpmlog import get_logger from usrp_mpm.rpc_server import no_claim from usrp_mpm.periph_manager import PeriphManagerBase from usrp_mpm.simulator.sim_dboard_catalina import SimulatedCatalinaDboard +from usrp_mpm.simulator.chdr_sniffer import ChdrSniffer CLOCK_SOURCE_INTERNAL = "internal" @@ -67,8 +68,6 @@ class sim(PeriphManagerBase): """ ######################################################################### # Overridables - # - # See PeriphManagerBase for documentation on these fields ######################################################################### description = "E320-Series Device - SIMULATED" pids = {0xE320: 'e320'} @@ -84,6 +83,8 @@ class sim(PeriphManagerBase): super().__init__() self.device_id = 1 + self.chdr_sniffer = ChdrSniffer(self.log, args) + # Unlike the real hardware drivers, if there is an exception here, # we just crash. No use missing an error when testing. self._init_peripherals(args) @@ -91,8 +92,9 @@ class sim(PeriphManagerBase): if not args.get('skip_boot_init', False): self.init(args) - def _simulator_frequency(self, freq): - self.log.debug("Setting Simulator Sample Frequency to {}".format(freq)) + def _simulator_sample_rate(self, freq): + self.log.debug("Setting Simulator Sample Rate to {}".format(freq)) + self.chdr_endpoint.set_sample_rate(freq) @classmethod def generate_device_info(cls, eeprom_md, mboard_info, dboard_infos): @@ -148,7 +150,8 @@ class sim(PeriphManagerBase): self.log.debug("Device info: {}".format(self.device_info)) def _init_dboards(self, dboard_infos, override_dboard_pids, default_args): - self.dboards.append(SimulatedCatalinaDboard(E320_DBOARD_SLOT_IDX, self._simulator_frequency)) + self.dboards.append(SimulatedCatalinaDboard( + E320_DBOARD_SLOT_IDX, self._simulator_sample_rate)) self.log.info("Found %d daughterboard(s).", len(self.dboards)) ########################################################################### diff --git a/mpm/python/usrp_mpm/simulator/CMakeLists.txt b/mpm/python/usrp_mpm/simulator/CMakeLists.txt index 6cc4ee441..82fcbd801 100644 --- a/mpm/python/usrp_mpm/simulator/CMakeLists.txt +++ b/mpm/python/usrp_mpm/simulator/CMakeLists.txt @@ -13,6 +13,10 @@ set(USRP_MPM_SIMULATOR_FILES ${CMAKE_CURRENT_SOURCE_DIR}/__init__.py ${CMAKE_CURRENT_SOURCE_DIR}/sim_dboard.py ${CMAKE_CURRENT_SOURCE_DIR}/sim_dboard_catalina.py + ${CMAKE_CURRENT_SOURCE_DIR}/chdr_sniffer.py + ${CMAKE_CURRENT_SOURCE_DIR}/noc_block_regs.py + ${CMAKE_CURRENT_SOURCE_DIR}/rfnoc_graph.py + ${CMAKE_CURRENT_SOURCE_DIR}/stream_ep_regs.py ) list(APPEND USRP_MPM_FILES ${USRP_MPM_SIMULATOR_FILES}) set(USRP_MPM_FILES ${USRP_MPM_FILES} PARENT_SCOPE) diff --git a/mpm/python/usrp_mpm/simulator/chdr_sniffer.py b/mpm/python/usrp_mpm/simulator/chdr_sniffer.py new file mode 100644 index 000000000..8ff467cf2 --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/chdr_sniffer.py @@ -0,0 +1,107 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module houses the ChdrSniffer class, which handles networking, +packet dispatch, and the nitty gritty of spinning up rx and tx workers. + +Note: Rx and Tx are reversed when compared to their definitions in the +UHD radio_control_impl.cpp file. Rx is when samples are coming to the +simulator. Tx is when samples are being sent by the simulator. + +TODO: This class is run based on threads for development simplicity. If +more throughput is desired, it should be rewritten to use processes +instead. This allows the socket workers to truly work in parallel. +Python threads are limited by holding the GIL while they are executing. +""" + +from threading import Thread +import socket +from uhd.chdr import ChdrPacket, ChdrWidth, PacketType +from .rfnoc_graph import XbarNode, XportNode, StreamEndpointNode, RFNoCGraph, NodeType + +CHDR_W = ChdrWidth.W64 + +class ChdrSniffer: + """This class is created by the sim periph_manager + It is responsible for opening sockets, dispatching all chdr packet + traffic to the appropriate destination, and responding to said + traffic. + + The extra_args parameter is passed in from the periph_manager, and + coresponds to the --default_args flag of usrp_hwd.py on the + command line + """ + def __init__(self, log, extra_args): + self.log = log.getChild("ChdrSniffer") + self.thread = Thread(target=self.socket_worker, daemon=True) + self.thread.start() + self.graph = RFNoCGraph(self.get_default_nodes(), self.log, 1, self.begin_tx, + self.end_tx, self.send_strc, self.begin_rx) + self.xport_map = {} + + def set_sample_rate(self, rate): + """Set the sample_rate of the next tx_stream. + + This method is called by the daughterboard. It coresponds to + sim_dboard.py:sim_db#set_catalina_clock_rate() + """ + self.graph.get_stream_spec().sample_rate = rate + + def get_default_nodes(self): + """Get a sensible NoC Core setup. This is the simplest + functional layout. It has one of each required component. + """ + nodes = [ + XportNode(0), + XbarNode(0, [2], [0]), + StreamEndpointNode(0) + ] + return nodes + + def send_strc(self, stream_ep, addr): + pass # TODO: currently not implemented + + def begin_tx(self, src_epid, stream_spec): + pass # TODO: currently not implemented + + def end_tx(self, src_epid): + pass # TODO: currently not implemented + + def begin_rx(self, dst_epid): + pass # TODO: currently not implemented + + def socket_worker(self): + """This is the method that runs in a background thread. It + blocks on the CHDR socket and processes packets as they come + in. + """ + self.log.info("Starting ChdrSniffer Thread") + main_sock = socket.socket(socket.AF_INET, + socket.SOCK_DGRAM) + main_sock.bind(("0.0.0.0", 49153)) + + while True: + # This allows us to block on multiple sockets at the same time + buffer = bytearray(8192) # Max MTU + # received Data over socket + n_bytes, sender = main_sock.recvfrom_into(buffer) + self.log.trace("received {} bytes of data from {}" + .format(n_bytes, sender)) + try: + packet = ChdrPacket.deserialize(CHDR_W, buffer[:n_bytes]) + self.log.trace("Decoded Packet: {}".format(packet.to_string_with_payload())) + entry_xport = (1, NodeType.XPORT, 0) + pkt_type = packet.get_header().pkt_type + response = self.graph.handle_packet(packet, entry_xport, sender) + + if response is not None: + data = response.serialize() + self.log.trace("Returning Packet: {}" + .format(packet.to_string_with_payload())) + main_sock.sendto(bytes(data), sender) + except BaseException as ex: + self.log.warning("Unable to decode packet: {}" + .format(ex)) + raise ex diff --git a/mpm/python/usrp_mpm/simulator/noc_block_regs.py b/mpm/python/usrp_mpm/simulator/noc_block_regs.py new file mode 100644 index 000000000..b488ed43f --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/noc_block_regs.py @@ -0,0 +1,312 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +# Read Register Addresses +#! Register address of the protocol version +PROTOVER_ADDR = 0 * 4 +#! Register address of the port information +PORT_CNT_ADDR = 1 * 4 +#! Register address of the edge information +EDGE_CNT_ADDR = 2 * 4 +#! Register address of the device information +DEVICE_INFO_ADDR = 3 * 4 +#! Register address of the controlport information +CTRLPORT_CNT_ADDR = 4 * 4 +#! (Write) Register address of the flush and reset controls +FLUSH_RESET_ADDR = 1 * 4 + +#! Base address of the adjacency list +ADJACENCY_BASE_ADDR = 0x10000 +#! Each port is allocated this many registers in the backend register space +REGS_PER_PORT = 16 + +REG_RX_MAX_WORDS_PER_PKT = 0x28 +REG_RX_CMD_NUM_WORDS_HI = 0x1C +REG_RX_CMD_NUM_WORDS_LO = 0x18 +REG_RX_CMD = 0x14 + +RX_CMD_CONTINUOUS = 0x2 +RX_CMD_STOP = 0x0 +RX_CMD_FINITE = 0x1 + +RADIO_BASE_ADDR = 0x1000 +REG_CHAN_OFFSET = 128 # 0x80 + + +class StreamEndpointPort: + """Represents a port on a Stream Endpoint + + inst should be the same as the stream endpoint's node_inst + """ + def __init__(self, inst, port): + self.inst = inst + self.port = port + def to_tuple(self, num_stream_ep): + # The entry in an adjacency list is (blk_id, block_port) + # where blk_id for stream endpoints starts at 1 + # and Noc Blocks are addressed after the last stream endpoint + # See rfnoc_graph.cpp + return (1 + self.inst, self.port) + +class NocBlockPort: + """Represents a port on a Noc Block""" + def __init__(self, inst, port): + self.inst = inst + self.port = port + + def to_tuple(self, num_stream_ep): + # The entry in an adjacency list is (blk_id, block_port) + # where blk_id for stream endpoints starts at 1 + # and Noc Blocks are addressed after the last stream endpoint + # See rfnoc_graph.cpp + return (1 + num_stream_ep + self.inst, self.port) + +class NocBlock: + """Represents a NocBlock + + see client_zero.hpp:block_config_info + + NOTE: The mtu in bytes is calculated by (2**data_mtu * CHDR_W) + """ + def __init__(self, protover, num_inputs, num_outputs, ctrl_fifo_size, + ctrl_max_async_msgs, noc_id, data_mtu): + self.protover = protover + self.num_inputs = num_inputs + self.num_outputs = num_outputs + self.ctrl_fifo_size = ctrl_fifo_size + self.ctrl_max_async_msgs = ctrl_max_async_msgs + self.noc_id = noc_id + self.data_mtu = data_mtu + + def read_reg(self, reg_num): + # See client_zero.cpp + if reg_num == 0: + return self.read_config() + elif reg_num == 1: + return self.read_noc_id() + elif reg_num == 2: + return self.read_data() + else: + raise RuntimeError("NocBlock doesn't have a register #{}".format(reg_num)) + + def read_config(self): + return (self.protover & 0x3F) | \ + ((self.num_inputs & 0x3F) << 6) | \ + ((self.num_outputs & 0x3F) << 12) | \ + ((self.ctrl_fifo_size & 0x3F) << 18) | \ + ((self.ctrl_max_async_msgs & 0xFF) << 24) + + def read_data(self): + return (self.data_mtu & 0x3F) << 2 | \ + (1 << 1) # Permanently Set flush done + + def read_noc_id(self): + return self.noc_id & 0xFFFFFFFF + +class NocBlockRegs: + """Represents registers associated whith a group of NoCBlocks + roughly similar to UHD's client_zero + + NOTE: Many write operations are currently unimplemented and simply no-op + """ + def __init__(self, log, protover, has_xbar, num_xports, blocks, num_stream_ep, num_ctrl_ep, + device_type, adjacency_list, sample_width, samples_per_cycle, get_stream_spec, + create_tx_stream, stop_tx_stream): + """ Args: + protover -> FPGA Compat number + has_xbar -> Is there a chdr xbar? + num_xports -> how many xports + blocks -> list of NocBlock objects + num_stream_ep -> how many stream endpoints + num_ctrl_ep -> how many ctrl endpoints + device_type -> the device type (see defaults.hpp:device_type_t in UHD) + adjacency_list -> List of (Port, Port) tuples where + Port is either StreamEndpointPort or NocBlockPort + sample_width -> Sample width of radio + samples_per_cycle -> Samples produced by a radio cycle + get_stream_spec -> Callback which returns the current stream spec + create_tx_stream -> Callback which takes a block_index and starts a tx stream + stop_tx_stream -> Callback which takes a block_index and stops a tx stream + """ + self.log = log.getChild("Regs") + self.protover = protover + self.has_xbar = has_xbar + self.num_xports = num_xports + self.blocks = blocks + self.num_blocks = len(blocks) + self.num_stream_ep = num_stream_ep + self.num_ctrl_ep = num_ctrl_ep + self.device_type = device_type + self.adjacency_list = [(src_blk.to_tuple(num_stream_ep), dst_blk.to_tuple(num_stream_ep)) + for src_blk, dst_blk in adjacency_list] + self.adjacency_list_reg = NocBlockRegs._parse_adjacency_list(self.adjacency_list) + self.sample_width = sample_width + self.samples_per_cycle = samples_per_cycle + self.radio_reg = {} + self.get_stream_spec = get_stream_spec + self.create_tx_stream = create_tx_stream + self.stop_tx_stream = stop_tx_stream + + def read(self, addr): + # See client_zero.cpp + if addr == PROTOVER_ADDR: + return self.read_protover() + elif addr == PORT_CNT_ADDR: + return self.read_port_cnt() + elif addr == EDGE_CNT_ADDR: + return self.read_edge_cnt() + elif addr == DEVICE_INFO_ADDR: + return self.read_device_info() + elif addr == CTRLPORT_CNT_ADDR: + return self.read_ctrlport_cnt() + elif addr >= 0x40 and addr < 0x1000: + return self.read_port_reg(addr) + # See radio_control_impl.cpp + elif addr >= 0x1000 and addr < 0x10000: + return self.read_radio(addr) + # See client_zero.cpp + elif addr >= 0x10000: + return self.read_adjacency_list(addr) + else: + raise RuntimeError("Unsupported register addr: 0x{:08X}".format(addr)) + + def read_radio(self, addr): + if addr == 0x1000: + raise NotImplementedError() # TODO: This should be REG_COMPAT + elif addr == 0x1004: + return self.read_radio_width() + else: + offset = addr - 0x1000 + chan = offset // 0x80 + radio_offset = offset % 0x80 + if radio_offset == 0x40: + return self.radio_reg + elif radio_offset == 0x3C: + return self.radio_reg + else: + raise NotImplementedError("Radio addr 0x{:08X} not implemented".format(addr)) + + def write_radio(self, addr, value): + """Write a value to radio registers + + See radio_control_impl.cpp + """ + offset = addr - 0x1000 + assert offset >= 0 + chan = offset // 0x80 + if chan > 0: # For now, just operate as if there is one channel + self.log.warn("Channel {} not suported".format(chan)) + return + reg = offset % 0x80 + if reg == REG_RX_MAX_WORDS_PER_PKT: + self.get_stream_spec().packet_samples = value + elif reg == REG_RX_CMD_NUM_WORDS_HI: + self.get_stream_spec().set_num_words_hi(value) + elif reg == REG_RX_CMD_NUM_WORDS_LO: + self.get_stream_spec().set_num_words_lo(value) + elif reg == REG_RX_CMD: + if value & (1 << 31) != 0: + self.log.warn("Timed Streams are not supported. Starting immediately") + value = value & ~(1 << 31) # Clear the flag + if value == RX_CMD_STOP: + sep_block_id = self.resolve_ep_towards_outputs((self.get_radio_port(), chan)) + self.stop_tx_stream(sep_block_id) + return + elif value == RX_CMD_CONTINUOUS: + self.get_stream_spec().is_continuous = True + elif value == RX_CMD_FINITE: + self.get_stream_spec().is_continuous = False + else: + raise RuntimeError("Unknown Stream RX_CMD: {:08X}".format(value)) + sep_block_id = self.resolve_ep_towards_outputs((self.get_radio_port(), chan)) + self.create_tx_stream(sep_block_id) + + def resolve_ep_towards_outputs(self, block_id): + """Follow dataflow downstream through the adjacency list until + a stream_endpoint is encountered + """ + for src_blk, dst_blk in self.adjacency_list: + if src_blk == block_id: + dst_index, dst_port = dst_blk + if dst_index <= self.num_stream_ep: + return dst_blk + else: + return self.resolve_ep_towards_outputs(dst_blk) + + def get_radio_port(self): + """Returns the block_id of the radio block""" + radio_noc_id = 0x12AD1000 + for i, block in enumerate(self.blocks): + if block.noc_id == radio_noc_id: + return i + 1 + self.num_stream_ep + + # This is the FPGA compat number + def read_protover(self): + return 0xFFFF & self.protover + + def read_port_cnt(self): + return (self.num_stream_ep & 0x3FF) | \ + ((self.num_blocks & 0x3FF) << 10) | \ + ((self.num_xports & 0x3FF) << 20) | \ + ((1 if self.has_xbar else 0) << 31) + + def read_edge_cnt(self): + return len(self.adjacency_list) + + def read_device_info(self): + return (self.device_type & 0xFFFF) << 16 + + def read_ctrlport_cnt(self): + return (self.num_ctrl_ep & 0x3FF) + + def read_adjacency_list(self, addr): + offset = addr & 0xFFFF + if offset == 0: + self.log.debug("Adjacency List has {} entries".format(len(self.adjacency_list_reg))) + return len(self.adjacency_list_reg) + else: + assert(offset % 4 == 0) + index = (offset // 4) - 1 + return self.adjacency_list_reg[index] + + def write(self, addr, value): + if addr == 0x1040 or addr == 0x10C0: + self.log.trace("Storing value: 0x:{:08X} to self.radio_reg for data loopback test".format(value)) + self.radio_reg = value + # assuming 2 channels, out of bounds is + # BASE + 2 * CHAN_OFFSET = 0x1000 + 2 * 0x80 = 0x1100 + elif 0x1000 <= addr < 0x1100: + self.write_radio(addr, value) + + def read_port_reg(self, addr): + port = addr // 0x40 + if port < self.num_stream_ep: + raise NotImplementedError() + else: + block = port - self.num_stream_ep - 1 + offset = (addr % 0x40) // 4 + return self.blocks[block].read_reg(offset) + + def read_radio_width(self): + return (self.samples_per_cycle & 0xFFFF) | \ + ((self.sample_width & 0xFFFF) << 16) + + @staticmethod + def _parse_adjacency_list(adj_list): + """Serialize an adjacency list from the form of + [((src_blk, src_port), (dst_blk, dst_port))] + + See client_zero.cpp:client_zero#_get_adjacency_list() + """ + def pack(blocks): + src_blk, src_port = blocks[0] + dst_blk, dst_port = blocks[1] + return ((src_blk & 0x3FF) << 22) | \ + ((src_port & 0x3F) << 16) | \ + ((dst_blk & 0x3FF) << 6) | \ + ((dst_port & 0x3F) << 0) + return [pack(blocks) for blocks in adj_list] + diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_graph.py b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py new file mode 100644 index 000000000..42210ab6e --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/rfnoc_graph.py @@ -0,0 +1,554 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module includes all of the components necessary to simulate the +configuration of a NoC Core and NoC Blocks. + +This module handles and responds to Management and Ctrl Packets. It +also instantiates the registers and acts as an interface between +the chdr packets on the network and the registers. +""" +from enum import IntEnum +from uhd.chdr import PacketType, MgmtOp, MgmtOpCode, MgmtOpNodeInfo, \ + CtrlStatus, CtrlOpCode, MgmtOpCfg, MgmtOpSelDest +from .noc_block_regs import NocBlockRegs, NocBlock, StreamEndpointPort, NocBlockPort +from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED + +class StreamSpec: + """This class carries the configuration parameters of a Tx stream. + + total_samples, is_continuous, and packet_samples come from the + radio registers (noc_block_regs.py) + + sample_rate comes from an rpc to the daughterboard (through the + set_sample_rate method in chdr_sniffer.py) + + dst_epid comes from the source stream_ep + + addr comes from the xport passed through when routing to dst_epid + """ + LOW_MASK = 0xFFFFFFFF + HIGH_MASK = (0xFFFFFFFF) << 32 + def __init__(self): + self.total_samples = 0 + self.is_continuous = True + self.packet_samples = None + self.sample_rate = None + self.dst_epid = None + self.addr = None + + def set_num_words_lo(self, low): + """Set the low 32 bits of the total_samples field""" + self.total_samples = (self.total_samples & (StreamSpec.HIGH_MASK)) \ + | (low & StreamSpec.LOW_MASK) + + def set_num_words_hi(self, high): + """Set the high 32 bits of the total_samples field""" + self.total_samples = (self.total_samples & StreamSpec.LOW_MASK) \ + | ((high & StreamSpec.LOW_MASK) << 32) + + def seconds_per_packet(self): + """Calculates how many seconds should be between each packet + transmit + """ + assert self.packet_samples != 0 + assert self.sample_rate != 0 + return self.packet_samples / self.sample_rate + + def __str__(self): + return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \ + " sample_rate: {}, dst_epid: {}, addr: {}}}" \ + .format(self.total_samples, self.is_continuous, self.packet_samples, + self.sample_rate, self.dst_epid, self.addr) + +def to_iter(index_func, length): + """Allows looping over an indexed object in a for-each loop""" + for i in range(length): + yield index_func(i) + +def _swap_src_dst(packet, payload): + """Swap the src_epid and the dst_epid of a packet""" + header = packet.get_header() + our_epid = header.dst_epid + header.dst_epid = payload.src_epid + payload.src_epid = our_epid + packet.set_header(header) + +class NodeType(IntEnum): + """The type of a node in a NoC Core + + RTS is a magic value used to determine when to return a packet to + the sender + """ + INVALID = 0 + XBAR = 1 + STRM_EP = 2 + XPORT = 3 + RTS = 0xFF + +RETURN_TO_SENDER = (0, NodeType.RTS, 0) + +class Node: + """Represents a node in a RFNoCGraph + + These objects are usually constructed by the caller of RFNoC Graph. + Initially, they have references to other nodes as indexes of the + list of nodes. The graph calls graph_init and from_index so this + node can initialize their references to node_ids instead. + + Subclasses can override _handle_<packet_type>_packet methods, and + _handle_default_packet is provided, which will receive packets whose + specific methods are not overridden + """ + def __init__(self, node_inst): + """node_inst should start at 0 and increase for every Node of + the same type that is constructed + """ + self.device_id = None + self.node_inst = node_inst + self.log = None + + def graph_init(self, log, device_id, **kwargs): + """This method is called to initialize the Node Graph""" + self.device_id = device_id + self.log = log + + def get_id(self): + """Return the NodeID (device_id, node_type, node_inst)""" + return (self.device_id, self.get_type(), self.node_inst) + + def get_type(self): + """Returns the NodeType of this node""" + raise NotImplementedError + + def handle_packet(self, packet, **kwargs): + """Processes a packet + + The return value should be a node_id or RETURN_TO_SENDER + """ + packet_type = packet.get_header().pkt_type + next_node = None + if packet_type == PacketType.MGMT: + next_node = self._handle_mgmt_packet(packet, **kwargs) + elif packet_type == PacketType.CTRL: + next_node = self._handle_ctrl_packet(packet, **kwargs) + elif packet_type in (PacketType.DATA_W_TS, PacketType.DATA_NO_TS): + next_node = self._handle_data_packet(packet, **kwargs) + elif packet_type == PacketType.STRS: + next_node = self._handle_strs_packet(packet, **kwargs) + elif packet_type == PacketType.STRC: + next_node = self._handle_strc_packet(packet, **kwargs) + else: + raise RuntimeError("Invalid Enum Value for PacketType: {}".format(packet_type)) + if next_node == NotImplemented: + next_node = self._handle_default_packet(packet, **kwargs) + return next_node + + def _handle_mgmt_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_ctrl_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_data_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_strc_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_strs_packet(self, packet, **kwargs): + return NotImplemented + + def _handle_default_packet(self, packet, **kwargs): + raise RuntimeError("{} has no operation defined for a {} packet" + .format(self.__class__, packet.get_header().pkt_type)) + + def from_index(self, nodes): + """Initialize this node's indexes to block_id references""" + pass + + def info_response(self, extended_info): + """Generate a node info response MgmtOp""" + return MgmtOp( + op_payload=MgmtOpNodeInfo(self.device_id, self.get_type(), + self.node_inst, extended_info), + op_code=MgmtOpCode.INFO_RESP + ) + +class XportNode(Node): + """Represents an Xport node + + When an Advertise Management op is received, the address and + src_epid of the packet is placed in self.addr_map + """ + def __init__(self, node_inst): + super().__init__(node_inst) + self.downstream = None + self.addr_map = {} + + def get_type(self): + return NodeType.XPORT + + def _handle_mgmt_packet(self, packet, addr, **kwargs): + send_upstream = False + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + packet.set_payload(payload) + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + payload.get_hop(0).add_op(self.info_response(0)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.ADVERTISE: + self.log.info("Advertise: {} | EPID:{} -> EPID:{}" + .format(self.get_id(), payload.src_epid, + packet.get_header().dst_epid)) + self.log.info("addr_map updated: EPID:{} -> {}".format(payload.src_epid, addr)) + self.addr_map[payload.src_epid] = addr + else: + raise NotImplementedError(op.op_code) + self.log.trace("Xport {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + else: + return self.downstream + + def _handle_default_packet(self, packet, **kwargs): + return self.downstream + +class XbarNode(Node): + """Represents a crossbar node + + self.routing_table stores a mapping of dst_epids to xbar ports + + port numbers start from 0. xports are addressed first + stream ep ports are then addressed where + the index of the first ep = len(xport_ports) + + NOTE: UHD inteprets the node_inst of a xbar as the port which + it is connected to. This differs from its handling of node_inst + for other types of nodes. + """ + def __init__(self, node_inst, ports, xport_ports): + super().__init__(node_inst) + self.nports = len(ports) + len(xport_ports) + self.nports_xport = len(xport_ports) + self.ports = xport_ports + ports + self.routing_table = {} + + def get_type(self): + return NodeType.XBAR + + def _handle_mgmt_packet(self, packet, **kwargs): + send_upstream = False + destination = None + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + payload.get_hop(0).add_op(self.info_response( + (self.nports_xport << 8) | self.nports)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.ADVERTISE: + self.log.info("Advertise: {}".format(self.get_id())) + elif op.op_code == MgmtOpCode.CFG_WR_REQ: + cfg = op.get_op_payload() + cfg = MgmtOpCfg.parse(cfg) + self.routing_table[cfg.addr] = cfg.data + self.log.debug("Xbar {} routing changed: {}" + .format(self.node_inst, self.routing_table)) + elif op.op_code == MgmtOpCode.SEL_DEST: + cfg = op.get_op_payload() + cfg = MgmtOpSelDest.parse(cfg) + dest_port = cfg.dest + destination = self.ports[dest_port] + else: + raise NotImplementedError(op.op_code) + self.log.trace("Xbar {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + elif destination is not None: + return destination + + def _handle_default_packet(self, packet, **kwargs): + dst_epid = packet.get_header().dst_epid + if dst_epid not in self.routing_table: + raise RuntimeError("Xbar no destination for packet (dst_epid: {})".format(dst_epid)) + return self.ports[self.routing_table[dst_epid]] + + def from_index(self, nodes): + """This iterates through the list of nodes and sets the + reference in self.ports to the node's id + """ + for i in range(len(self.ports)): + nodes_index = self.ports[i] + node = nodes[nodes_index] + self.ports[i] = node.get_id() + if node.__class__ is XportNode: + node.downstream = self.get_id() + elif node.__class__ is StreamEndpointNode: + node.upstream = self.get_id() + +class StreamEndpointNode(Node): + """Represents a Stream endpoint node + + This class contains a StreamEpRegs object. To clarify, management + packets access these registers, while control packets access the + registers of the noc_blocks which are held in the RFNoCGraph and + passed into handle_packet as the regs parameter + """ + def __init__(self, node_inst): + super().__init__(node_inst) + self.epid = node_inst + self.dst_epid = None + self.upstream = None + self.sep_config = None + # These 4 aren't configurable right now + self.has_data = True + self.has_ctrl = True + self.input_ports = 2 + self.output_ports = 2 + self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid, + self.status_callback_out, self.status_callback_in, 4, 4 * 8000) + + def status_callback_out(self, status): + """Called by the ep_regs when on a write to the + REG_OSTRM_CTRL_STATUS register + """ + if status.cfg_start: + # This only creates a new TxWorker if the cfg_start flag is set + self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) + self.sep_config(self, True) + return STRM_STATUS_FC_ENABLED + + def status_callback_in(self, status): + """Called by the ep_regs on a write to the + REG_OSTRM_CTRL_STATUS register + """ + # This always triggers the graph to create a new RxWorker + self.sep_config(self, False) + return STRM_STATUS_FC_ENABLED + + def graph_init(self, log, device_id, sep_config, **kwargs): + super().graph_init(log, device_id) + self.ep_regs.log = log + self.sep_config = sep_config + + def get_type(self): + return NodeType.STRM_EP + + def get_epid(self): + return self.epid + + def set_epid(self, epid): + self.epid = epid + + def set_dst_epid(self, dst_epid): + self.dst_epid = dst_epid + + def _handle_mgmt_packet(self, packet, regs, **kwargs): + send_upstream = False + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + ext_info = 0 + ext_info |= (1 if self.has_ctrl else 0) << 0 + ext_info |= (1 if self.has_data else 0) << 1 + ext_info |= (self.input_ports & 0x3F) << 2 + ext_info |= (self.output_ports & 0x3F) << 8 + payload.get_hop(0).add_op(self.info_response(ext_info)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + _swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.CFG_RD_REQ: + request = MgmtOpCfg.parse(op.get_op_payload()) + value = self.ep_regs.read(request.addr) + payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP, + MgmtOpCfg(request.addr, value))) + elif op.op_code == MgmtOpCode.CFG_WR_REQ: + request = MgmtOpCfg.parse(op.get_op_payload()) + self.ep_regs.write(request.addr, request.data) + else: + raise NotImplementedError("op_code {} is not implemented for " + "StreamEndpointNode".format(op.op_code)) + self.log.trace("Stream Endpoint {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + + def _handle_ctrl_packet(self, packet, regs, **kwargs): + payload = packet.get_payload_ctrl() + _swap_src_dst(packet, payload) + if payload.status != CtrlStatus.OKAY: + raise RuntimeError("Control Status not OK: {}".format(payload.status)) + if payload.op_code == CtrlOpCode.READ: + payload.is_ack = True + payload.set_data([regs.read(payload.address)]) + elif payload.op_code == CtrlOpCode.WRITE: + payload.is_ack = True + regs.write(payload.address, payload.get_data()[0]) + else: + raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code)) + packet.set_payload(payload) + return RETURN_TO_SENDER + +class RFNoCGraph: + """This class holds all of the nodes of the NoC core and the Noc + blocks. + + It serves as an interface between the ChdrSniffer and the + individual blocks/nodes. + """ + def __init__(self, graph_list, log, device_id, create_tx_func, + stop_tx_func, send_strc_func, create_rx_func): + self.log = log.getChild("Graph") + self.send_strc = send_strc_func + self.device_id = device_id + self.stream_spec = StreamSpec() + self.create_tx = create_tx_func + self.stop_tx = stop_tx_func + self.create_rx = create_rx_func + num_stream_ep = 0 + for node in graph_list: + if node.__class__ is StreamEndpointNode: + num_stream_ep += 1 + node.graph_init(self.log, device_id, sep_config=self.prepare_stream_ep) + # These must be done sequentially so that device_id is initialized on all nodes + # before from_index is called on any node + for node in graph_list: + node.from_index(graph_list) + self.graph_map = {node.get_id(): node + for node in graph_list} + # For now, just use one radio block and hardcode it to the first stream endpoint + radio = NocBlock(1 << 16, 2, 2, 512, 1, 0x12AD1000, 16) + adj_list = [ + (StreamEndpointPort(0, 0), NocBlockPort(0, 0)), + (StreamEndpointPort(0, 1), NocBlockPort(0, 1)), + (NocBlockPort(0, 0), StreamEndpointPort(0, 0)), + (NocBlockPort(0, 1), StreamEndpointPort(0, 1)) + ] + self.regs = NocBlockRegs(self.log, 1 << 16, True, 1, [radio], num_stream_ep, 1, 0xE320, + adj_list, 8, 1, self.get_stream_spec, self.radio_tx_cmd, + self.radio_tx_stop) + + def radio_tx_cmd(self, sep_block_id): + """Triggers the creation of a TxWorker in the ChdrSniffer using + the current stream_spec. + + This method transforms the sep_block_id into an epid useable by + the transmit code + """ + # TODO: Use the port + sep_blk, sep_port = sep_block_id + # The NoC Block index for stream endpoints is the inst + 1 + # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map() + sep_inst = sep_blk - 1 + sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst) + stream_ep = self.graph_map[sep_id] + self.stream_spec.dst_epid = stream_ep.dst_epid + self.stream_spec.addr = self.dst_to_addr(stream_ep) + self.log.info("Streaming with StreamSpec:") + self.log.info(str(self.stream_spec)) + self.create_tx(stream_ep.epid, self.stream_spec) + self.stream_spec = StreamSpec() + + def radio_tx_stop(self, sep_block_id): + """Triggers the destuction of a TxWorker in the ChdrSniffer + + This method transforms the sep_block_id into an epid useable by + the transmit code + """ + sep_blk, sep_port = sep_block_id + sep_inst = sep_blk - 1 + # The NoC Block index for stream endpoints is the inst + 1 + # See rfnoc_graph.cpp:rfnoc_graph_impl#_init_sep_map() + sep_id = (self.get_device_id(), NodeType.STRM_EP, sep_inst) + src_epid = self.graph_map[sep_id].epid + self.stop_tx(src_epid) + + def get_device_id(self): + return self.device_id + + def prepare_stream_ep(self, stream_ep, is_tx): + """This is called by a stream_ep when it receives a status update + + Depending on whether it was a tx or rx status, it will either + trigger an strc packet or an rx stream + """ + if is_tx: + addr = self.dst_to_addr(stream_ep) + self.send_strc(stream_ep, addr) + else: + self.create_rx(stream_ep.epid) + + def change_spp(self, spp): + self.stream_spec.packet_samples = spp + + def find_ep_by_id(self, epid): + """Find a Stream Endpoint which identifies with epid""" + for node in self.graph_map.values(): + if node.__class__ is StreamEndpointNode: + if node.epid == epid: + return node + + # Fixme: This doesn't support intra-device connections + # i.e. connecting nodes by connecting two internal stream endpoints + # + # That would require looking at the adjacency list if we end up at + # another stream endpoint instead of an xport + def dst_to_addr(self, src_ep): + """This function traverses backwards through the node graph, + starting from src_ep and taking the path traveled by a packet + heading for src_ep.dst_epid. When it encounters an xport, it + returns the address associated with the dst_epid in the xport's + addr_map + """ + current_node = src_ep + dst_epid = src_ep.dst_epid + while current_node.__class__ != XportNode: + if current_node.__class__ == StreamEndpointNode: + current_node = self.graph_map[current_node.upstream] + else: + # current_node is a xbar + port = current_node.routing_table[dst_epid] + upstream_id = current_node.ports[port] + current_node = self.graph_map[upstream_id] + return current_node.addr_map[dst_epid] + + def handle_packet(self, packet, xport_input, addr): + """Given a chdr_packet, the id of an xport node to serve as an + entry point, and a source address, send the packet through the + node graph. + """ + node_id = xport_input + response_packet = None + while node_id is not None: + if node_id[1] == NodeType.RTS: + response_packet = packet + break + node = self.graph_map[node_id] + # If the node returns a value, it is the node id of the + # node the packet should be passed to next + # or RETURN_TO_SENDER + node_id = node.handle_packet(packet, regs=self.regs, addr=addr) + return response_packet + + def get_stream_spec(self): + return self.stream_spec diff --git a/mpm/python/usrp_mpm/simulator/stream_ep_regs.py b/mpm/python/usrp_mpm/simulator/stream_ep_regs.py new file mode 100644 index 000000000..defed345f --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/stream_ep_regs.py @@ -0,0 +1,129 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +"""This module holds the Emulated registers for a Stream Endpoint Node. +One of these objects is instantiated for each Stream Endpoint Node. +""" +from enum import IntEnum + +REG_EPID_SELF = 0x00 # RW +REG_RESET_AND_FLUSH = 0x04 # W +REG_OSTRM_CTRL_STATUS = 0x08 # RW +REG_OSTRM_DST_EPID = 0x0C # W +REG_OSTRM_FC_FREQ_BYTES_LO = 0x10 # W +REG_OSTRM_FC_FREQ_BYTES_HI = 0x14 # W +REG_OSTRM_FC_FREQ_PKTS = 0x18 # W +REG_OSTRM_FC_HEADROOM = 0x1C # W +REG_OSTRM_BUFF_CAP_BYTES_LO = 0x20 # R +REG_OSTRM_BUFF_CAP_BYTES_HI = 0x24 # R +REG_OSTRM_BUFF_CAP_PKTS = 0x28 # R +REG_OSTRM_SEQ_ERR_CNT = 0x2C # R +REG_OSTRM_DATA_ERR_CNT = 0x30 # R +REG_OSTRM_ROUTE_ERR_CNT = 0x34 # R +REG_ISTRM_CTRL_STATUS = 0x38 # RW + +RESET_AND_FLUSH_OSTRM = (1 << 0) +RESET_AND_FLUSH_ISTRM = (1 << 1) +RESET_AND_FLUSH_CTRL = (1 << 2) +RESET_AND_FLUSH_ALL = 0x7 + +STRM_STATUS_FC_ENABLED = 0x80000000 +STRM_STATUS_SETUP_ERR = 0x40000000 +STRM_STATUS_SETUP_PENDING = 0x20000000 + +class SwBuff(IntEnum): + """The size of the elements in a buffer""" + BUFF_U64 = 0 + BUFF_U32 = 1 + BUFF_U16 = 2 + BUFF_U8 = 3 + +class CtrlStatusWord: + """Represents a Control Status Word + + See mgmt_portal:BUILD_CTRL_STATUS_WORD() + """ + def __init__(self, cfg_start, xport_lossy, pyld_buff_fmt, mdata_buff_fmt, byte_swap): + self.cfg_start = cfg_start + self.xport_lossy = xport_lossy + self.pyld_buff_fmt = pyld_buff_fmt + self.mdata_buff_fmt = mdata_buff_fmt + self.byte_swap = byte_swap + + @classmethod + def parse(cls, val): + cfg_start = (val & 1) != 0 + xport_lossy = ((val >> 1) & 1) != 0 + pyld_buff_fmt = SwBuff((val >> 2) & 3) + mdata_buff_fmt = SwBuff((val >> 4) & 3) + byte_swap = ((val >> 6) & 1) != 0 + return cls(cfg_start, xport_lossy, pyld_buff_fmt, mdata_buff_fmt, byte_swap) + + def __str__(self): + return "CtrlStatusWord{{cfg_start: {}, xport_lossy: {}, " \ + "pyld_buff_fmt: {}, mdata_buff_fmt: {}, byte_swap: {}}}" \ + .format(self.cfg_start, self.xport_lossy, + self.pyld_buff_fmt, self.mdata_buff_fmt, self.byte_swap) + +class StreamEpRegs: + """Represents a set of registers associated with a stream endpoint + which can be accessed through management packets + + See mgmt_portal.cpp + """ + def __init__(self, get_epid, set_epid, set_dst_epid, update_status_out, + update_status_in, cap_pkts, cap_bytes): + self.get_epid = get_epid + self.set_epid = set_epid + self.set_dst_epid = set_dst_epid + self.log = None + self.out_ctrl_status = 0 + self.in_ctrl_status = 0 + self.update_status_out = update_status_out + self.update_status_in = update_status_in + self.cap_pkts = cap_pkts + self.cap_bytes = cap_bytes + + def read(self, addr): + if addr == REG_EPID_SELF: + return self.get_epid() + elif addr == REG_OSTRM_CTRL_STATUS: + return self.out_ctrl_status + elif addr == REG_ISTRM_CTRL_STATUS: + return self.in_ctrl_status + elif addr == REG_OSTRM_BUFF_CAP_BYTES_LO: + return self.cap_bytes & 0xFFFFFFFF + elif addr == REG_OSTRM_BUFF_CAP_BYTES_HI: + return (self.cap_bytes >> 32) & 0xFFFFFFFF + elif addr == REG_OSTRM_BUFF_CAP_PKTS: + return self.cap_pkts + else: + raise NotImplementedError("Unable to read addr 0x{:08X} from stream ep regs" + .format(addr)) + + def write(self, addr, val): + if addr == REG_EPID_SELF: + self.log.debug("Setting EPID to {}".format(val)) + self.set_epid(val) + elif addr == REG_OSTRM_CTRL_STATUS: + status = CtrlStatusWord.parse(val) + self.log.debug("Setting EPID Output Stream Ctrl Status: {}".format(status)) + new_status = self.update_status_out(status) + self.out_ctrl_status = new_status if new_status is not None else val + elif addr == REG_RESET_AND_FLUSH: + self.log.trace("Stream EP Regs Reset and Flush") + elif addr == REG_OSTRM_DST_EPID: + self.log.debug("Setting Dest EPID to {}".format(val)) + self.set_dst_epid(val) + elif REG_OSTRM_FC_FREQ_BYTES_LO <= addr <= REG_OSTRM_FC_HEADROOM: + pass # TODO: implement these Flow Control parameters + elif addr == REG_ISTRM_CTRL_STATUS: + status = CtrlStatusWord.parse(val) + self.log.debug("Setting EPID Input Stream Ctrl Status: {}".format(status)) + new_status = self.update_status_in(status) + self.in_ctrl_status = new_status if new_status is not None else val + else: + raise NotImplementedError("Unable to write addr 0x{:08X} from stream ep regs" + .format(addr)) |