diff options
-rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_stream.py | 65 |
1 files changed, 47 insertions, 18 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py index 7147a98b5..df7f613a4 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_stream.py +++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py @@ -22,13 +22,27 @@ class XferCount: self.num_bytes = 0 self.num_packets = 0 - def count_packet(self, length): - """Accounts for a packet of len bytes in the xfer count""" - self.num_bytes += length + @classmethod + def from_strc(cls, payload): + """Construct an XferCount from a Strc Payload""" + xfer = cls() + xfer.num_packets = payload.num_pkts + xfer.num_bytes = payload.num_bytes + return xfer + + def has_exceeded(self, limit): + """returns true if this XferCount >= the limit + in either packets or bytes + """ + return self.num_packets >= limit.num_packets or self.num_bytes >= limit.num_bytes + + def count_packet(self, len): + """Account for a len bytes sized packet transfer""" + self.num_bytes += len self.num_packets += 1 def clear(self): - """Reset the xfer counts to 0""" + """Reset the counts to zero""" self.num_bytes = 0 self.num_packets = 0 @@ -85,7 +99,7 @@ class ChdrInputStream: packets into the sample_sink and responds to the STRC packets using the send_wrapper """ - CAPACITY_BYTES = int(5e6) # 5 MB + CAPACITY_BYTES = int(5e3) # 5 KB QUEUE_CAP = 3 def __init__(self, log, chdr_w, sample_sink, send_wrapper, our_epid): self.log = log @@ -113,24 +127,36 @@ class ChdrInputStream: if self.stop: break header = packet.get_header() + self.xfer.count_packet(recv_len) + self.accum.count_packet(recv_len) pkt_type = header.pkt_type if pkt_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS): - self.xfer.count_packet(recv_len) self.sample_sink.accept_packet(packet) elif pkt_type == PacketType.STRC: req_payload = packet.get_payload_strc() - resp_header = ChdrHeader() - resp_header.dst_epid = req_payload.src_epid - if req_payload.op_code == StrcOpCode.INIT: - self.xfer.clear() - elif req_payload.op_code == StrcOpCode.RESYNC: - self.xfer.num_bytes = req_payload.xfer_count_bytes - self.xfer.num_packets = req_payload.xfer_count_pkts - resp_payload = self._generate_strs_payload(header.dst_epid) - resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload) + # Ping doesn't set a new target, just checks status + if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT): + if req_payload.op_code == StrcOpCode.INIT: + self.xfer.clear() + self.fc_freq = XferCount.from_strc(req_payload) + self.command_addr = addr + self.command_epid = req_payload.src_epid + else: + self.xfer = XferCount.from_strc(req_payload) + resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid) self.send_wrapper.send_packet(resp_packet, addr) else: raise RuntimeError("RX Worker received unsupported packet: {}".format(pkt_type)) + + # Check if a fc status packet is due + if self.fc_freq is not None and self.accum.has_exceeded(self.fc_freq): + self.accum.clear() + self.log.trace("Flow Control Due, sending STRS") + self.command_target = None + resp_packet = self._generate_strs_packet(self.command_epid, self.our_epid) + self.log.trace("Sending Flow Control: {}".format(resp_packet.to_string_with_payload())) + self.send_wrapper.send_packet(resp_packet, self.command_addr) + self.sample_sink.close() self.log.info("Stream RX Worker Done") @@ -141,8 +167,10 @@ class ChdrInputStream: self.stop = True self.rx_queue.put((None, None, None)) - def _generate_strs_payload(self, src_epid): - """Create an strs payload from the information in self.xfer""" + def _generate_strs_packet(self, dst_epid, src_epid): + """Create an strs packet from the information in self.xfer""" + resp_header = ChdrHeader() + resp_header.dst_epid = dst_epid resp_payload = StrsPayload() resp_payload.src_epid = src_epid resp_payload.status = StrsStatus.OKAY @@ -150,7 +178,8 @@ class ChdrInputStream: resp_payload.capacity_pkts = 0xFFFFFF resp_payload.xfer_count_bytes = self.xfer.num_bytes resp_payload.xfer_count_pkts = self.xfer.num_packets - return resp_payload + resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload) + return resp_packet def queue_packet(self, packet, recv_len, addr): """Queue a packet to be processed by the ChdrInputStream""" |