diff options
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/stream_endpoint_node.py')
-rw-r--r-- | mpm/python/usrp_mpm/simulator/stream_endpoint_node.py | 21 |
1 files changed, 19 insertions, 2 deletions
diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py index b4477ee47..06aaaa046 100644 --- a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py +++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py @@ -8,7 +8,7 @@ This is a stream endpoint node, which is used in the RFNoCGraph class. It also houses the logic to create output and input streams. """ from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \ - ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket + ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket, StrsStatus from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED from .chdr_stream import ChdrOutputStream, ChdrInputStream @@ -39,6 +39,8 @@ class StreamEndpointNode(Node): self.dst_to_addr = None self.source_gen = source_gen self.sink_gen = sink_gen + self.downstream_capacity = None + self.strs_handlers = {} self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid, self.ctrl_status_callback_out, self.ctrl_status_callback_in, 4, 4 * 8000) @@ -52,6 +54,12 @@ class StreamEndpointNode(Node): self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) addr = self.dst_to_addr(self) self.send_strc(addr) + def handle_initial_strs(strs_packet): + payload = strs_packet.get_payload_strs() + assert payload.status == StrsStatus.OKAY, \ + "Received STRS Packet Err: {}".format(payload.status) + self.downstream_capacity = (payload.capacity_pkts, payload.capacity_bytes) + self.strs_handlers[self.epid] = handle_initial_strs return STRM_STATUS_FC_ENABLED def ctrl_status_callback_in(self, status): @@ -141,7 +149,11 @@ class StreamEndpointNode(Node): self.input_stream.queue_packet(packet, num_bytes, sender) def _handle_strs_packet(self, packet, **kwargs): - pass # TODO: Implement flow control for output streams + header = packet.get_header() + if header.dst_epid in self.strs_handlers: + self.strs_handlers.pop(header.dst_epid)(packet) + else: + self.output_stream.queue_packet(packet) def _handle_strc_packet(self, packet, **kwargs): self._handle_data_packet(packet, **kwargs) @@ -160,6 +172,8 @@ class StreamEndpointNode(Node): payload = StrcPayload() payload.src_epid = self.epid payload.op_code = StrcOpCode.INIT + payload.num_pkts = 10 + payload.num_bytes = 10 * 1024 # 10 KB packet = ChdrPacket(self.chdr_w, header, payload) self.send_wrapper.send_packet(packet, addr) @@ -175,6 +189,9 @@ class StreamEndpointNode(Node): assert self.output_stream is None, \ "Output Stream already running on epid: {}".format(self.epid) stream_spec.dst_epid = self.dst_epid + stream_spec.capacity_packets = self.downstream_capacity[0] + stream_spec.capacity_bytes = self.downstream_capacity[1] + self.downstream_capacity = None self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(), stream_spec, self.send_wrapper) |