From 687ed5bba07559a314d56e70b0eb727c7c8d9cbf Mon Sep 17 00:00:00 2001 From: Samuel O'Brien Date: Wed, 5 Aug 2020 13:12:18 -0500 Subject: sim: Implement UHD > Simulator Flow Control When sending data to the simulator, python simply cannot process the data as fast as UHD can send it. Flow control ensures that uhd doesn't overwhelm the simulator. Simulator > UHD flow control isn't implemented yet. Signed-off-by: Samuel O'Brien --- mpm/python/usrp_mpm/simulator/chdr_stream.py | 65 ++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 18 deletions(-) (limited to 'mpm') diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py index 7147a98b5..df7f613a4 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_stream.py +++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py @@ -22,13 +22,27 @@ class XferCount: self.num_bytes = 0 self.num_packets = 0 - def count_packet(self, length): - """Accounts for a packet of len bytes in the xfer count""" - self.num_bytes += length + @classmethod + def from_strc(cls, payload): + """Construct an XferCount from a Strc Payload""" + xfer = cls() + xfer.num_packets = payload.num_pkts + xfer.num_bytes = payload.num_bytes + return xfer + + def has_exceeded(self, limit): + """returns true if this XferCount >= the limit + in either packets or bytes + """ + return self.num_packets >= limit.num_packets or self.num_bytes >= limit.num_bytes + + def count_packet(self, len): + """Account for a len bytes sized packet transfer""" + self.num_bytes += len self.num_packets += 1 def clear(self): - """Reset the xfer counts to 0""" + """Reset the counts to zero""" self.num_bytes = 0 self.num_packets = 0 @@ -85,7 +99,7 @@ class ChdrInputStream: packets into the sample_sink and responds to the STRC packets using the send_wrapper """ - CAPACITY_BYTES = int(5e6) # 5 MB + CAPACITY_BYTES = int(5e3) # 5 KB QUEUE_CAP = 3 def __init__(self, log, chdr_w, sample_sink, send_wrapper, our_epid): self.log = log @@ -113,24 +127,36 @@ class ChdrInputStream: if self.stop: break header = packet.get_header() + self.xfer.count_packet(recv_len) + self.accum.count_packet(recv_len) pkt_type = header.pkt_type if pkt_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS): - self.xfer.count_packet(recv_len) self.sample_sink.accept_packet(packet) elif pkt_type == PacketType.STRC: req_payload = packet.get_payload_strc() - resp_header = ChdrHeader() - resp_header.dst_epid = req_payload.src_epid - if req_payload.op_code == StrcOpCode.INIT: - self.xfer.clear() - elif req_payload.op_code == StrcOpCode.RESYNC: - self.xfer.num_bytes = req_payload.xfer_count_bytes - self.xfer.num_packets = req_payload.xfer_count_pkts - resp_payload = self._generate_strs_payload(header.dst_epid) - resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload) + # Ping doesn't set a new target, just checks status + if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT): + if req_payload.op_code == StrcOpCode.INIT: + self.xfer.clear() + self.fc_freq = XferCount.from_strc(req_payload) + self.command_addr = addr + self.command_epid = req_payload.src_epid + else: + self.xfer = XferCount.from_strc(req_payload) + resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid) self.send_wrapper.send_packet(resp_packet, addr) else: raise RuntimeError("RX Worker received unsupported packet: {}".format(pkt_type)) + + # Check if a fc status packet is due + if self.fc_freq is not None and self.accum.has_exceeded(self.fc_freq): + self.accum.clear() + self.log.trace("Flow Control Due, sending STRS") + self.command_target = None + resp_packet = self._generate_strs_packet(self.command_epid, self.our_epid) + self.log.trace("Sending Flow Control: {}".format(resp_packet.to_string_with_payload())) + self.send_wrapper.send_packet(resp_packet, self.command_addr) + self.sample_sink.close() self.log.info("Stream RX Worker Done") @@ -141,8 +167,10 @@ class ChdrInputStream: self.stop = True self.rx_queue.put((None, None, None)) - def _generate_strs_payload(self, src_epid): - """Create an strs payload from the information in self.xfer""" + def _generate_strs_packet(self, dst_epid, src_epid): + """Create an strs packet from the information in self.xfer""" + resp_header = ChdrHeader() + resp_header.dst_epid = dst_epid resp_payload = StrsPayload() resp_payload.src_epid = src_epid resp_payload.status = StrsStatus.OKAY @@ -150,7 +178,8 @@ class ChdrInputStream: resp_payload.capacity_pkts = 0xFFFFFF resp_payload.xfer_count_bytes = self.xfer.num_bytes resp_payload.xfer_count_pkts = self.xfer.num_packets - return resp_payload + resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload) + return resp_packet def queue_packet(self, packet, recv_len, addr): """Queue a packet to be processed by the ChdrInputStream""" -- cgit v1.2.3