diff options
| author | Samuel O'Brien <sam.obrien@ni.com> | 2020-08-05 13:12:18 -0500 | 
|---|---|---|
| committer | Aaron Rossetto <aaron.rossetto@ni.com> | 2020-10-28 15:25:48 -0500 | 
| commit | 687ed5bba07559a314d56e70b0eb727c7c8d9cbf (patch) | |
| tree | 5f4669fccb2a740c29add489b6e8eef3d5466ea4 /mpm/python | |
| parent | e4517deef3f0ed18112c08ecbbd7baef9dedaa74 (diff) | |
| download | uhd-687ed5bba07559a314d56e70b0eb727c7c8d9cbf.tar.gz uhd-687ed5bba07559a314d56e70b0eb727c7c8d9cbf.tar.bz2 uhd-687ed5bba07559a314d56e70b0eb727c7c8d9cbf.zip | |
sim: Implement UHD > Simulator Flow Control
When sending data to the simulator, python simply cannot process the
data as fast as UHD can send it. Flow control ensures that uhd doesn't
overwhelm the simulator. Simulator > UHD flow control isn't implemented
yet.
Signed-off-by: Samuel O'Brien <sam.obrien@ni.com>
Diffstat (limited to 'mpm/python')
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_stream.py | 65 | 
1 files changed, 47 insertions, 18 deletions
| diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py index 7147a98b5..df7f613a4 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_stream.py +++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py @@ -22,13 +22,27 @@ class XferCount:          self.num_bytes = 0          self.num_packets = 0 -    def count_packet(self, length): -        """Accounts for a packet of len bytes in the xfer count""" -        self.num_bytes += length +    @classmethod +    def from_strc(cls, payload): +        """Construct an XferCount from a Strc Payload""" +        xfer = cls() +        xfer.num_packets = payload.num_pkts +        xfer.num_bytes = payload.num_bytes +        return xfer + +    def has_exceeded(self, limit): +        """returns true if this XferCount >= the limit +        in either packets or bytes +        """ +        return self.num_packets >= limit.num_packets or self.num_bytes >= limit.num_bytes + +    def count_packet(self, len): +        """Account for a len bytes sized packet transfer""" +        self.num_bytes += len          self.num_packets += 1      def clear(self): -        """Reset the xfer counts to 0""" +        """Reset the counts to zero"""          self.num_bytes = 0          self.num_packets = 0 @@ -85,7 +99,7 @@ class ChdrInputStream:      packets into the sample_sink and responds to the STRC packets using      the send_wrapper      """ -    CAPACITY_BYTES = int(5e6) # 5 MB +    CAPACITY_BYTES = int(5e3) # 5 KB      QUEUE_CAP = 3      def __init__(self, log, chdr_w, sample_sink, send_wrapper, our_epid):          self.log = log @@ -113,24 +127,36 @@ class ChdrInputStream:              if self.stop:                  break              header = packet.get_header() +            self.xfer.count_packet(recv_len) +            self.accum.count_packet(recv_len)              pkt_type = header.pkt_type              if pkt_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS): -                self.xfer.count_packet(recv_len)                  self.sample_sink.accept_packet(packet)              elif pkt_type == PacketType.STRC:                  req_payload = packet.get_payload_strc() -                resp_header = ChdrHeader() -                resp_header.dst_epid = req_payload.src_epid -                if req_payload.op_code == StrcOpCode.INIT: -                    self.xfer.clear() -                elif req_payload.op_code == StrcOpCode.RESYNC: -                    self.xfer.num_bytes = req_payload.xfer_count_bytes -                    self.xfer.num_packets = req_payload.xfer_count_pkts -                resp_payload = self._generate_strs_payload(header.dst_epid) -                resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload) +                # Ping doesn't set a new target, just checks status +                if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT): +                    if req_payload.op_code == StrcOpCode.INIT: +                        self.xfer.clear() +                        self.fc_freq = XferCount.from_strc(req_payload) +                        self.command_addr = addr +                        self.command_epid = req_payload.src_epid +                    else: +                        self.xfer = XferCount.from_strc(req_payload) +                resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid)                  self.send_wrapper.send_packet(resp_packet, addr)              else:                  raise RuntimeError("RX Worker received unsupported packet: {}".format(pkt_type)) + +            # Check if a fc status packet is due +            if self.fc_freq is not None and self.accum.has_exceeded(self.fc_freq): +                self.accum.clear() +                self.log.trace("Flow Control Due, sending STRS") +                self.command_target = None +                resp_packet = self._generate_strs_packet(self.command_epid, self.our_epid) +                self.log.trace("Sending Flow Control: {}".format(resp_packet.to_string_with_payload())) +                self.send_wrapper.send_packet(resp_packet, self.command_addr) +          self.sample_sink.close()          self.log.info("Stream RX Worker Done") @@ -141,8 +167,10 @@ class ChdrInputStream:          self.stop = True          self.rx_queue.put((None, None, None)) -    def _generate_strs_payload(self, src_epid): -        """Create an strs payload from the information in self.xfer""" +    def _generate_strs_packet(self, dst_epid, src_epid): +        """Create an strs packet from the information in self.xfer""" +        resp_header = ChdrHeader() +        resp_header.dst_epid = dst_epid          resp_payload = StrsPayload()          resp_payload.src_epid = src_epid          resp_payload.status = StrsStatus.OKAY @@ -150,7 +178,8 @@ class ChdrInputStream:          resp_payload.capacity_pkts = 0xFFFFFF          resp_payload.xfer_count_bytes = self.xfer.num_bytes          resp_payload.xfer_count_pkts = self.xfer.num_packets -        return resp_payload +        resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload) +        return resp_packet      def queue_packet(self, packet, recv_len, addr):          """Queue a packet to be processed by the ChdrInputStream""" | 
