diff options
Diffstat (limited to 'mpm/python/usrp_mpm/simulator')
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_stream.py | 65 | 
1 files changed, 47 insertions, 18 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py index 7147a98b5..df7f613a4 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_stream.py +++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py @@ -22,13 +22,27 @@ class XferCount:          self.num_bytes = 0          self.num_packets = 0 -    def count_packet(self, length): -        """Accounts for a packet of len bytes in the xfer count""" -        self.num_bytes += length +    @classmethod +    def from_strc(cls, payload): +        """Construct an XferCount from a Strc Payload""" +        xfer = cls() +        xfer.num_packets = payload.num_pkts +        xfer.num_bytes = payload.num_bytes +        return xfer + +    def has_exceeded(self, limit): +        """returns true if this XferCount >= the limit +        in either packets or bytes +        """ +        return self.num_packets >= limit.num_packets or self.num_bytes >= limit.num_bytes + +    def count_packet(self, len): +        """Account for a len bytes sized packet transfer""" +        self.num_bytes += len          self.num_packets += 1      def clear(self): -        """Reset the xfer counts to 0""" +        """Reset the counts to zero"""          self.num_bytes = 0          self.num_packets = 0 @@ -85,7 +99,7 @@ class ChdrInputStream:      packets into the sample_sink and responds to the STRC packets using      the send_wrapper      """ -    CAPACITY_BYTES = int(5e6) # 5 MB +    CAPACITY_BYTES = int(5e3) # 5 KB      QUEUE_CAP = 3      def __init__(self, log, chdr_w, sample_sink, send_wrapper, our_epid):          self.log = log @@ -113,24 +127,36 @@ class ChdrInputStream:              if self.stop:                  break              header = packet.get_header() +            self.xfer.count_packet(recv_len) +            self.accum.count_packet(recv_len)              pkt_type = header.pkt_type              if pkt_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS): -                self.xfer.count_packet(recv_len)                  self.sample_sink.accept_packet(packet)              elif pkt_type == PacketType.STRC:                  req_payload = packet.get_payload_strc() -                resp_header = ChdrHeader() -                resp_header.dst_epid = req_payload.src_epid -                if req_payload.op_code == StrcOpCode.INIT: -                    self.xfer.clear() -                elif req_payload.op_code == StrcOpCode.RESYNC: -                    self.xfer.num_bytes = req_payload.xfer_count_bytes -                    self.xfer.num_packets = req_payload.xfer_count_pkts -                resp_payload = self._generate_strs_payload(header.dst_epid) -                resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload) +                # Ping doesn't set a new target, just checks status +                if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT): +                    if req_payload.op_code == StrcOpCode.INIT: +                        self.xfer.clear() +                        self.fc_freq = XferCount.from_strc(req_payload) +                        self.command_addr = addr +                        self.command_epid = req_payload.src_epid +                    else: +                        self.xfer = XferCount.from_strc(req_payload) +                resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid)                  self.send_wrapper.send_packet(resp_packet, addr)              else:                  raise RuntimeError("RX Worker received unsupported packet: {}".format(pkt_type)) + +            # Check if a fc status packet is due +            if self.fc_freq is not None and self.accum.has_exceeded(self.fc_freq): +                self.accum.clear() +                self.log.trace("Flow Control Due, sending STRS") +                self.command_target = None +                resp_packet = self._generate_strs_packet(self.command_epid, self.our_epid) +                self.log.trace("Sending Flow Control: {}".format(resp_packet.to_string_with_payload())) +                self.send_wrapper.send_packet(resp_packet, self.command_addr) +          self.sample_sink.close()          self.log.info("Stream RX Worker Done") @@ -141,8 +167,10 @@ class ChdrInputStream:          self.stop = True          self.rx_queue.put((None, None, None)) -    def _generate_strs_payload(self, src_epid): -        """Create an strs payload from the information in self.xfer""" +    def _generate_strs_packet(self, dst_epid, src_epid): +        """Create an strs packet from the information in self.xfer""" +        resp_header = ChdrHeader() +        resp_header.dst_epid = dst_epid          resp_payload = StrsPayload()          resp_payload.src_epid = src_epid          resp_payload.status = StrsStatus.OKAY @@ -150,7 +178,8 @@ class ChdrInputStream:          resp_payload.capacity_pkts = 0xFFFFFF          resp_payload.xfer_count_bytes = self.xfer.num_bytes          resp_payload.xfer_count_pkts = self.xfer.num_packets -        return resp_payload +        resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload) +        return resp_packet      def queue_packet(self, packet, recv_len, addr):          """Queue a packet to be processed by the ChdrInputStream"""  | 
