# # Copyright 2020 Ettus Research, a National Instruments Brand # # SPDX-License-Identifier: GPL-3.0-or-later # """ This is a stream endpoint node, which is used in the RFNoCGraph class. It also houses the logic to create output and input streams. """ from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \ ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket, StrsStatus from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED from .chdr_stream import ChdrOutputStream, ChdrInputStream class StreamEndpointNode(Node): """Represents a Stream endpoint node This class contains a StreamEpRegs object. To clarify, management packets access these registers, while control packets access the registers of the noc_blocks which are held in the RFNoCGraph and passed into handle_packet as the regs parameter """ def __init__(self, node_inst, source_gen, sink_gen): super().__init__(node_inst) self.epid = node_inst self.dst_epid = None self.upstream = None # ---- These 4 aren't configurable right now self.has_data = True self.has_ctrl = True self.input_ports = 1 self.output_ports = 1 # ---- self.input_stream = None self.output_stream = None self.chdr_w = None self.send_wrapper = None self.dst_to_addr = None self.source_gen = source_gen self.sink_gen = sink_gen self.downstream_capacity = None self.strs_handlers = {} self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid, self.ctrl_status_callback_out, self.ctrl_status_callback_in, 4, 4 * 8000) def ctrl_status_callback_out(self, status): """Called by the ep_regs when on a write to the REG_OSTRM_CTRL_STATUS register """ if status.cfg_start: # This only creates a new ChdrOutputStream if the cfg_start flag is set self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) addr = self.dst_to_addr(self) self.send_strc(addr) def handle_initial_strs(strs_packet): payload = strs_packet.get_payload_strs() assert payload.status == StrsStatus.OKAY, \ "Received STRS Packet Err: {}".format(payload.status) self.downstream_capacity = (payload.capacity_pkts, payload.capacity_bytes) self.strs_handlers[self.epid] = handle_initial_strs return STRM_STATUS_FC_ENABLED def ctrl_status_callback_in(self, status): """Called by the ep_regs on a write to the REG_OSTRM_CTRL_STATUS register """ # This always triggers the graph to create a new ChdrInputStream self.begin_input() return STRM_STATUS_FC_ENABLED def graph_init(self, log, set_device_id, send_wrapper, chdr_w, dst_to_addr, **kwargs): super().graph_init(log, set_device_id) self.ep_regs.log = log self.chdr_w = chdr_w self.send_wrapper = send_wrapper self.dst_to_addr = dst_to_addr def get_type(self): return NodeType.STRM_EP def get_epid(self): """Get this endpoint's endpoint id""" return self.epid def set_epid(self, epid): """Set this endpoint's endpoint id""" self.epid = epid def set_dst_epid(self, dst_epid): """Set this endpoint's destination endpoint id""" self.dst_epid = dst_epid def _handle_mgmt_packet(self, packet, **kwargs): send_upstream = False payload = packet.get_payload_mgmt() our_hop = payload.pop_hop() for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): if op.op_code == MgmtOpCode.INFO_REQ: ext_info = 0 ext_info |= (1 if self.has_ctrl else 0) << 0 ext_info |= (1 if self.has_data else 0) << 1 ext_info |= (self.input_ports & 0x3F) << 2 ext_info |= (self.output_ports & 0x3F) << 8 payload.get_hop(0).add_op(self.info_response(ext_info)) elif op.op_code == MgmtOpCode.RETURN: send_upstream = True swap_src_dst(packet, payload) elif op.op_code == MgmtOpCode.NOP: pass elif op.op_code == MgmtOpCode.CFG_RD_REQ: request = MgmtOpCfg.parse(op.get_op_payload()) value = self.ep_regs.read(request.addr) payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP, MgmtOpCfg(request.addr, value))) elif op.op_code == MgmtOpCode.CFG_WR_REQ: request = MgmtOpCfg.parse(op.get_op_payload()) self.ep_regs.write(request.addr, request.data) else: raise NotImplementedError("op_code {} is not implemented for StreamEndpointNode" .format(op.op_code)) self.log.trace("Stream Endpoint {} processed hop:\n{}" .format(self.node_inst, our_hop)) packet.set_payload(payload) if send_upstream: return RETURN_TO_SENDER self.log.trace("Stream Endpoint {} received packet:\n{}" .format(self.node_inst, packet)) def _handle_ctrl_packet(self, packet, regs, **kwargs): payload = packet.get_payload_ctrl() swap_src_dst(packet, payload) if payload.status != CtrlStatus.OKAY: raise RuntimeError("Control Status not OK: {}".format(payload.status)) if payload.op_code == CtrlOpCode.READ: payload.is_ack = True payload.set_data([regs.read(payload.address)]) elif payload.op_code == CtrlOpCode.WRITE: payload.is_ack = True regs.write(payload.address, payload.get_data()[0]) else: raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code)) packet.set_payload(payload) return RETURN_TO_SENDER def _handle_data_packet(self, packet, num_bytes, sender, **kwargs): assert self.input_stream is not None self.input_stream.queue_packet(packet, num_bytes, sender) def _handle_strs_packet(self, packet, **kwargs): header = packet.get_header() if header.dst_epid in self.strs_handlers: self.strs_handlers.pop(header.dst_epid)(packet) else: self.output_stream.queue_packet(packet) def _handle_strc_packet(self, packet, **kwargs): self._handle_data_packet(packet, **kwargs) def send_strc(self, addr): """Send a Stream Command packet from the specified stream_ep to the specified address. This is not handled in ChdrOutputStream because the STRC must be dispatched before UHD configures the samples per packet, which is required to complete a StreamSpec """ header = ChdrHeader() header.dst_epid = self.dst_epid header.pkt_type = PacketType.STRC payload = StrcPayload() payload.src_epid = self.epid payload.op_code = StrcOpCode.INIT payload.num_pkts = 10 payload.num_bytes = 10 * 1024 # 10 KB packet = ChdrPacket(self.chdr_w, header, payload) self.send_wrapper.send_packet(packet, addr) def begin_output(self, stream_spec): """Spin up a new ChdrOutputStream thread which transmits from src_epid according to stream_spec. This is triggered from RFNoC Graph when the radio receives a Stream Command """ # As of now, only one stream endpoint port per stream endpoint # is supported. assert self.output_stream is None, \ "Output Stream already running on epid: {}".format(self.epid) stream_spec.dst_epid = self.dst_epid stream_spec.capacity_packets = self.downstream_capacity[0] stream_spec.capacity_bytes = self.downstream_capacity[1] self.downstream_capacity = None self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(), stream_spec, self.send_wrapper) def end_output(self): """Stops src_epid's current transmission. This opens up the sep to new transmissions in the future. This is triggered either by the RFNoC Graph when the radio receives a Stop Stream command or when the transmission has no more samples to send. """ self.output_stream.finish() self.output_stream = None def begin_input(self): """Spin up a new ChdrInputStream thread which receives all data and strc This is triggered by RFNoC Graph when there is a write to the REG_ISTRM_CTRL_STATUS register """ # As of now, only one stream endpoint port per stream endpoint # is supported. # Rx streams aren't explicitly ended by UHD. If we try to start # a new one on the same epid, just quietly close the old one. if self.input_stream is not None: self.input_stream.finish() self.input_stream = ChdrInputStream(self.log, self.chdr_w, self.sink_gen(), self.send_wrapper, self.epid)