aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/stream_endpoint_node.py')
-rw-r--r--mpm/python/usrp_mpm/simulator/stream_endpoint_node.py206
1 files changed, 206 insertions, 0 deletions
diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
new file mode 100644
index 000000000..b4477ee47
--- /dev/null
+++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
@@ -0,0 +1,206 @@
+#
+# Copyright 2020 Ettus Research, a National Instruments Brand
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+"""
+This is a stream endpoint node, which is used in the RFNoCGraph class.
+It also houses the logic to create output and input streams.
+"""
+from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \
+ ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket
+from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER
+from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
+from .chdr_stream import ChdrOutputStream, ChdrInputStream
+
+class StreamEndpointNode(Node):
+ """Represents a Stream endpoint node
+
+ This class contains a StreamEpRegs object. To clarify, management
+ packets access these registers, while control packets access the
+ registers of the noc_blocks which are held in the RFNoCGraph and
+ passed into handle_packet as the regs parameter
+ """
+ def __init__(self, node_inst, source_gen, sink_gen):
+ super().__init__(node_inst)
+ self.epid = node_inst
+ self.dst_epid = None
+ self.upstream = None
+ # ---- These 4 aren't configurable right now
+ self.has_data = True
+ self.has_ctrl = True
+ self.input_ports = 1
+ self.output_ports = 1
+ # ----
+ self.input_stream = None
+ self.output_stream = None
+ self.chdr_w = None
+ self.send_wrapper = None
+ self.dst_to_addr = None
+ self.source_gen = source_gen
+ self.sink_gen = sink_gen
+ self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
+ self.ctrl_status_callback_out, self.ctrl_status_callback_in,
+ 4, 4 * 8000)
+
+ def ctrl_status_callback_out(self, status):
+ """Called by the ep_regs when on a write to the
+ REG_OSTRM_CTRL_STATUS register
+ """
+ if status.cfg_start:
+ # This only creates a new ChdrOutputStream if the cfg_start flag is set
+ self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
+ addr = self.dst_to_addr(self)
+ self.send_strc(addr)
+ return STRM_STATUS_FC_ENABLED
+
+ def ctrl_status_callback_in(self, status):
+ """Called by the ep_regs on a write to the
+ REG_OSTRM_CTRL_STATUS register
+ """
+ # This always triggers the graph to create a new ChdrInputStream
+ self.begin_input()
+ return STRM_STATUS_FC_ENABLED
+
+ def graph_init(self, log, set_device_id, send_wrapper, chdr_w, dst_to_addr, **kwargs):
+ super().graph_init(log, set_device_id)
+ self.ep_regs.log = log
+ self.chdr_w = chdr_w
+ self.send_wrapper = send_wrapper
+ self.dst_to_addr = dst_to_addr
+
+ def get_type(self):
+ return NodeType.STRM_EP
+
+ def get_epid(self):
+ """Get this endpoint's endpoint id"""
+ return self.epid
+
+ def set_epid(self, epid):
+ """Set this endpoint's endpoint id"""
+ self.epid = epid
+
+ def set_dst_epid(self, dst_epid):
+ """Set this endpoint's destination endpoint id"""
+ self.dst_epid = dst_epid
+
+ def _handle_mgmt_packet(self, packet, **kwargs):
+ send_upstream = False
+ payload = packet.get_payload_mgmt()
+ our_hop = payload.pop_hop()
+ for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
+ if op.op_code == MgmtOpCode.INFO_REQ:
+ ext_info = 0
+ ext_info |= (1 if self.has_ctrl else 0) << 0
+ ext_info |= (1 if self.has_data else 0) << 1
+ ext_info |= (self.input_ports & 0x3F) << 2
+ ext_info |= (self.output_ports & 0x3F) << 8
+ payload.get_hop(0).add_op(self.info_response(ext_info))
+ elif op.op_code == MgmtOpCode.RETURN:
+ send_upstream = True
+ swap_src_dst(packet, payload)
+ elif op.op_code == MgmtOpCode.NOP:
+ pass
+ elif op.op_code == MgmtOpCode.CFG_RD_REQ:
+ request = MgmtOpCfg.parse(op.get_op_payload())
+ value = self.ep_regs.read(request.addr)
+ payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP,
+ MgmtOpCfg(request.addr, value)))
+ elif op.op_code == MgmtOpCode.CFG_WR_REQ:
+ request = MgmtOpCfg.parse(op.get_op_payload())
+ self.ep_regs.write(request.addr, request.data)
+ else:
+ raise NotImplementedError("op_code {} is not implemented for StreamEndpointNode"
+ .format(op.op_code))
+ self.log.trace("Stream Endpoint {} processed hop:\n{}"
+ .format(self.node_inst, our_hop))
+ packet.set_payload(payload)
+ if send_upstream:
+ return RETURN_TO_SENDER
+ self.log.trace("Stream Endpoint {} received packet:\n{}"
+ .format(self.node_inst, packet))
+
+ def _handle_ctrl_packet(self, packet, regs, **kwargs):
+ payload = packet.get_payload_ctrl()
+ swap_src_dst(packet, payload)
+ if payload.status != CtrlStatus.OKAY:
+ raise RuntimeError("Control Status not OK: {}".format(payload.status))
+ if payload.op_code == CtrlOpCode.READ:
+ payload.is_ack = True
+ payload.set_data([regs.read(payload.address)])
+ elif payload.op_code == CtrlOpCode.WRITE:
+ payload.is_ack = True
+ regs.write(payload.address, payload.get_data()[0])
+ else:
+ raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code))
+ packet.set_payload(payload)
+ return RETURN_TO_SENDER
+
+ def _handle_data_packet(self, packet, num_bytes, sender, **kwargs):
+ assert self.input_stream is not None
+ self.input_stream.queue_packet(packet, num_bytes, sender)
+
+ def _handle_strs_packet(self, packet, **kwargs):
+ pass # TODO: Implement flow control for output streams
+
+ def _handle_strc_packet(self, packet, **kwargs):
+ self._handle_data_packet(packet, **kwargs)
+
+ def send_strc(self, addr):
+ """Send a Stream Command packet from the specified stream_ep
+ to the specified address.
+
+ This is not handled in ChdrOutputStream because the STRC must be
+ dispatched before UHD configures the samples per packet,
+ which is required to complete a StreamSpec
+ """
+ header = ChdrHeader()
+ header.dst_epid = self.dst_epid
+ header.pkt_type = PacketType.STRC
+ payload = StrcPayload()
+ payload.src_epid = self.epid
+ payload.op_code = StrcOpCode.INIT
+ packet = ChdrPacket(self.chdr_w, header, payload)
+ self.send_wrapper.send_packet(packet, addr)
+
+ def begin_output(self, stream_spec):
+ """Spin up a new ChdrOutputStream thread which transmits from src_epid
+ according to stream_spec.
+
+ This is triggered from RFNoC Graph when the radio receives a
+ Stream Command
+ """
+ # As of now, only one stream endpoint port per stream endpoint
+ # is supported.
+ assert self.output_stream is None, \
+ "Output Stream already running on epid: {}".format(self.epid)
+ stream_spec.dst_epid = self.dst_epid
+ self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(),
+ stream_spec, self.send_wrapper)
+
+ def end_output(self):
+ """Stops src_epid's current transmission. This opens up the sep
+ to new transmissions in the future.
+
+ This is triggered either by the RFNoC Graph when the radio
+ receives a Stop Stream command or when the transmission has no
+ more samples to send.
+ """
+ self.output_stream.finish()
+ self.output_stream = None
+
+ def begin_input(self):
+ """Spin up a new ChdrInputStream thread which receives all data and strc
+
+ This is triggered by RFNoC Graph when there is a write to the
+ REG_ISTRM_CTRL_STATUS register
+ """
+ # As of now, only one stream endpoint port per stream endpoint
+ # is supported.
+
+ # Rx streams aren't explicitly ended by UHD. If we try to start
+ # a new one on the same epid, just quietly close the old one.
+ if self.input_stream is not None:
+ self.input_stream.finish()
+ self.input_stream = ChdrInputStream(self.log, self.chdr_w,
+ self.sink_gen(), self.send_wrapper, self.epid)