aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mpm/python/usrp_mpm/simulator/chdr_stream.py65
1 files changed, 47 insertions, 18 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py
index 7147a98b5..df7f613a4 100644
--- a/mpm/python/usrp_mpm/simulator/chdr_stream.py
+++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py
@@ -22,13 +22,27 @@ class XferCount:
self.num_bytes = 0
self.num_packets = 0
- def count_packet(self, length):
- """Accounts for a packet of len bytes in the xfer count"""
- self.num_bytes += length
+ @classmethod
+ def from_strc(cls, payload):
+ """Construct an XferCount from a Strc Payload"""
+ xfer = cls()
+ xfer.num_packets = payload.num_pkts
+ xfer.num_bytes = payload.num_bytes
+ return xfer
+
+ def has_exceeded(self, limit):
+ """returns true if this XferCount >= the limit
+ in either packets or bytes
+ """
+ return self.num_packets >= limit.num_packets or self.num_bytes >= limit.num_bytes
+
+ def count_packet(self, len):
+ """Account for a len bytes sized packet transfer"""
+ self.num_bytes += len
self.num_packets += 1
def clear(self):
- """Reset the xfer counts to 0"""
+ """Reset the counts to zero"""
self.num_bytes = 0
self.num_packets = 0
@@ -85,7 +99,7 @@ class ChdrInputStream:
packets into the sample_sink and responds to the STRC packets using
the send_wrapper
"""
- CAPACITY_BYTES = int(5e6) # 5 MB
+ CAPACITY_BYTES = int(5e3) # 5 KB
QUEUE_CAP = 3
def __init__(self, log, chdr_w, sample_sink, send_wrapper, our_epid):
self.log = log
@@ -113,24 +127,36 @@ class ChdrInputStream:
if self.stop:
break
header = packet.get_header()
+ self.xfer.count_packet(recv_len)
+ self.accum.count_packet(recv_len)
pkt_type = header.pkt_type
if pkt_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS):
- self.xfer.count_packet(recv_len)
self.sample_sink.accept_packet(packet)
elif pkt_type == PacketType.STRC:
req_payload = packet.get_payload_strc()
- resp_header = ChdrHeader()
- resp_header.dst_epid = req_payload.src_epid
- if req_payload.op_code == StrcOpCode.INIT:
- self.xfer.clear()
- elif req_payload.op_code == StrcOpCode.RESYNC:
- self.xfer.num_bytes = req_payload.xfer_count_bytes
- self.xfer.num_packets = req_payload.xfer_count_pkts
- resp_payload = self._generate_strs_payload(header.dst_epid)
- resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload)
+ # Ping doesn't set a new target, just checks status
+ if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT):
+ if req_payload.op_code == StrcOpCode.INIT:
+ self.xfer.clear()
+ self.fc_freq = XferCount.from_strc(req_payload)
+ self.command_addr = addr
+ self.command_epid = req_payload.src_epid
+ else:
+ self.xfer = XferCount.from_strc(req_payload)
+ resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid)
self.send_wrapper.send_packet(resp_packet, addr)
else:
raise RuntimeError("RX Worker received unsupported packet: {}".format(pkt_type))
+
+ # Check if a fc status packet is due
+ if self.fc_freq is not None and self.accum.has_exceeded(self.fc_freq):
+ self.accum.clear()
+ self.log.trace("Flow Control Due, sending STRS")
+ self.command_target = None
+ resp_packet = self._generate_strs_packet(self.command_epid, self.our_epid)
+ self.log.trace("Sending Flow Control: {}".format(resp_packet.to_string_with_payload()))
+ self.send_wrapper.send_packet(resp_packet, self.command_addr)
+
self.sample_sink.close()
self.log.info("Stream RX Worker Done")
@@ -141,8 +167,10 @@ class ChdrInputStream:
self.stop = True
self.rx_queue.put((None, None, None))
- def _generate_strs_payload(self, src_epid):
- """Create an strs payload from the information in self.xfer"""
+ def _generate_strs_packet(self, dst_epid, src_epid):
+ """Create an strs packet from the information in self.xfer"""
+ resp_header = ChdrHeader()
+ resp_header.dst_epid = dst_epid
resp_payload = StrsPayload()
resp_payload.src_epid = src_epid
resp_payload.status = StrsStatus.OKAY
@@ -150,7 +178,8 @@ class ChdrInputStream:
resp_payload.capacity_pkts = 0xFFFFFF
resp_payload.xfer_count_bytes = self.xfer.num_bytes
resp_payload.xfer_count_pkts = self.xfer.num_packets
- return resp_payload
+ resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload)
+ return resp_packet
def queue_packet(self, packet, recv_len, addr):
"""Queue a packet to be processed by the ChdrInputStream"""