diff options
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/stream_endpoint_node.py')
-rw-r--r-- | mpm/python/usrp_mpm/simulator/stream_endpoint_node.py | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py new file mode 100644 index 000000000..b4477ee47 --- /dev/null +++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py @@ -0,0 +1,206 @@ +# +# Copyright 2020 Ettus Research, a National Instruments Brand +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +""" +This is a stream endpoint node, which is used in the RFNoCGraph class. +It also houses the logic to create output and input streams. +""" +from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \ + ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket +from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER +from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED +from .chdr_stream import ChdrOutputStream, ChdrInputStream + +class StreamEndpointNode(Node): + """Represents a Stream endpoint node + + This class contains a StreamEpRegs object. To clarify, management + packets access these registers, while control packets access the + registers of the noc_blocks which are held in the RFNoCGraph and + passed into handle_packet as the regs parameter + """ + def __init__(self, node_inst, source_gen, sink_gen): + super().__init__(node_inst) + self.epid = node_inst + self.dst_epid = None + self.upstream = None + # ---- These 4 aren't configurable right now + self.has_data = True + self.has_ctrl = True + self.input_ports = 1 + self.output_ports = 1 + # ---- + self.input_stream = None + self.output_stream = None + self.chdr_w = None + self.send_wrapper = None + self.dst_to_addr = None + self.source_gen = source_gen + self.sink_gen = sink_gen + self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid, + self.ctrl_status_callback_out, self.ctrl_status_callback_in, + 4, 4 * 8000) + + def ctrl_status_callback_out(self, status): + """Called by the ep_regs when on a write to the + REG_OSTRM_CTRL_STATUS register + """ + if status.cfg_start: + # This only creates a new ChdrOutputStream if the cfg_start flag is set + self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) + addr = self.dst_to_addr(self) + self.send_strc(addr) + return STRM_STATUS_FC_ENABLED + + def ctrl_status_callback_in(self, status): + """Called by the ep_regs on a write to the + REG_OSTRM_CTRL_STATUS register + """ + # This always triggers the graph to create a new ChdrInputStream + self.begin_input() + return STRM_STATUS_FC_ENABLED + + def graph_init(self, log, set_device_id, send_wrapper, chdr_w, dst_to_addr, **kwargs): + super().graph_init(log, set_device_id) + self.ep_regs.log = log + self.chdr_w = chdr_w + self.send_wrapper = send_wrapper + self.dst_to_addr = dst_to_addr + + def get_type(self): + return NodeType.STRM_EP + + def get_epid(self): + """Get this endpoint's endpoint id""" + return self.epid + + def set_epid(self, epid): + """Set this endpoint's endpoint id""" + self.epid = epid + + def set_dst_epid(self, dst_epid): + """Set this endpoint's destination endpoint id""" + self.dst_epid = dst_epid + + def _handle_mgmt_packet(self, packet, **kwargs): + send_upstream = False + payload = packet.get_payload_mgmt() + our_hop = payload.pop_hop() + for op in to_iter(our_hop.get_op, our_hop.get_num_ops()): + if op.op_code == MgmtOpCode.INFO_REQ: + ext_info = 0 + ext_info |= (1 if self.has_ctrl else 0) << 0 + ext_info |= (1 if self.has_data else 0) << 1 + ext_info |= (self.input_ports & 0x3F) << 2 + ext_info |= (self.output_ports & 0x3F) << 8 + payload.get_hop(0).add_op(self.info_response(ext_info)) + elif op.op_code == MgmtOpCode.RETURN: + send_upstream = True + swap_src_dst(packet, payload) + elif op.op_code == MgmtOpCode.NOP: + pass + elif op.op_code == MgmtOpCode.CFG_RD_REQ: + request = MgmtOpCfg.parse(op.get_op_payload()) + value = self.ep_regs.read(request.addr) + payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP, + MgmtOpCfg(request.addr, value))) + elif op.op_code == MgmtOpCode.CFG_WR_REQ: + request = MgmtOpCfg.parse(op.get_op_payload()) + self.ep_regs.write(request.addr, request.data) + else: + raise NotImplementedError("op_code {} is not implemented for StreamEndpointNode" + .format(op.op_code)) + self.log.trace("Stream Endpoint {} processed hop:\n{}" + .format(self.node_inst, our_hop)) + packet.set_payload(payload) + if send_upstream: + return RETURN_TO_SENDER + self.log.trace("Stream Endpoint {} received packet:\n{}" + .format(self.node_inst, packet)) + + def _handle_ctrl_packet(self, packet, regs, **kwargs): + payload = packet.get_payload_ctrl() + swap_src_dst(packet, payload) + if payload.status != CtrlStatus.OKAY: + raise RuntimeError("Control Status not OK: {}".format(payload.status)) + if payload.op_code == CtrlOpCode.READ: + payload.is_ack = True + payload.set_data([regs.read(payload.address)]) + elif payload.op_code == CtrlOpCode.WRITE: + payload.is_ack = True + regs.write(payload.address, payload.get_data()[0]) + else: + raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code)) + packet.set_payload(payload) + return RETURN_TO_SENDER + + def _handle_data_packet(self, packet, num_bytes, sender, **kwargs): + assert self.input_stream is not None + self.input_stream.queue_packet(packet, num_bytes, sender) + + def _handle_strs_packet(self, packet, **kwargs): + pass # TODO: Implement flow control for output streams + + def _handle_strc_packet(self, packet, **kwargs): + self._handle_data_packet(packet, **kwargs) + + def send_strc(self, addr): + """Send a Stream Command packet from the specified stream_ep + to the specified address. + + This is not handled in ChdrOutputStream because the STRC must be + dispatched before UHD configures the samples per packet, + which is required to complete a StreamSpec + """ + header = ChdrHeader() + header.dst_epid = self.dst_epid + header.pkt_type = PacketType.STRC + payload = StrcPayload() + payload.src_epid = self.epid + payload.op_code = StrcOpCode.INIT + packet = ChdrPacket(self.chdr_w, header, payload) + self.send_wrapper.send_packet(packet, addr) + + def begin_output(self, stream_spec): + """Spin up a new ChdrOutputStream thread which transmits from src_epid + according to stream_spec. + + This is triggered from RFNoC Graph when the radio receives a + Stream Command + """ + # As of now, only one stream endpoint port per stream endpoint + # is supported. + assert self.output_stream is None, \ + "Output Stream already running on epid: {}".format(self.epid) + stream_spec.dst_epid = self.dst_epid + self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(), + stream_spec, self.send_wrapper) + + def end_output(self): + """Stops src_epid's current transmission. This opens up the sep + to new transmissions in the future. + + This is triggered either by the RFNoC Graph when the radio + receives a Stop Stream command or when the transmission has no + more samples to send. + """ + self.output_stream.finish() + self.output_stream = None + + def begin_input(self): + """Spin up a new ChdrInputStream thread which receives all data and strc + + This is triggered by RFNoC Graph when there is a write to the + REG_ISTRM_CTRL_STATUS register + """ + # As of now, only one stream endpoint port per stream endpoint + # is supported. + + # Rx streams aren't explicitly ended by UHD. If we try to start + # a new one on the same epid, just quietly close the old one. + if self.input_stream is not None: + self.input_stream.finish() + self.input_stream = ChdrInputStream(self.log, self.chdr_w, + self.sink_gen(), self.send_wrapper, self.epid) |