aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm
diff options
context:
space:
mode:
authorSamuel O'Brien <sam.obrien@ni.com>2020-08-05 13:12:18 -0500
committerAaron Rossetto <aaron.rossetto@ni.com>2020-10-28 15:25:48 -0500
commit687ed5bba07559a314d56e70b0eb727c7c8d9cbf (patch)
tree5f4669fccb2a740c29add489b6e8eef3d5466ea4 /mpm/python/usrp_mpm
parente4517deef3f0ed18112c08ecbbd7baef9dedaa74 (diff)
downloaduhd-687ed5bba07559a314d56e70b0eb727c7c8d9cbf.tar.gz
uhd-687ed5bba07559a314d56e70b0eb727c7c8d9cbf.tar.bz2
uhd-687ed5bba07559a314d56e70b0eb727c7c8d9cbf.zip
sim: Implement UHD > Simulator Flow Control
When sending data to the simulator, python simply cannot process the data as fast as UHD can send it. Flow control ensures that uhd doesn't overwhelm the simulator. Simulator > UHD flow control isn't implemented yet. Signed-off-by: Samuel O'Brien <sam.obrien@ni.com>
Diffstat (limited to 'mpm/python/usrp_mpm')
-rw-r--r--mpm/python/usrp_mpm/simulator/chdr_stream.py65
1 files changed, 47 insertions, 18 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py
index 7147a98b5..df7f613a4 100644
--- a/mpm/python/usrp_mpm/simulator/chdr_stream.py
+++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py
@@ -22,13 +22,27 @@ class XferCount:
self.num_bytes = 0
self.num_packets = 0
- def count_packet(self, length):
- """Accounts for a packet of len bytes in the xfer count"""
- self.num_bytes += length
+ @classmethod
+ def from_strc(cls, payload):
+ """Construct an XferCount from a Strc Payload"""
+ xfer = cls()
+ xfer.num_packets = payload.num_pkts
+ xfer.num_bytes = payload.num_bytes
+ return xfer
+
+ def has_exceeded(self, limit):
+ """returns true if this XferCount >= the limit
+ in either packets or bytes
+ """
+ return self.num_packets >= limit.num_packets or self.num_bytes >= limit.num_bytes
+
+ def count_packet(self, len):
+ """Account for a len bytes sized packet transfer"""
+ self.num_bytes += len
self.num_packets += 1
def clear(self):
- """Reset the xfer counts to 0"""
+ """Reset the counts to zero"""
self.num_bytes = 0
self.num_packets = 0
@@ -85,7 +99,7 @@ class ChdrInputStream:
packets into the sample_sink and responds to the STRC packets using
the send_wrapper
"""
- CAPACITY_BYTES = int(5e6) # 5 MB
+ CAPACITY_BYTES = int(5e3) # 5 KB
QUEUE_CAP = 3
def __init__(self, log, chdr_w, sample_sink, send_wrapper, our_epid):
self.log = log
@@ -113,24 +127,36 @@ class ChdrInputStream:
if self.stop:
break
header = packet.get_header()
+ self.xfer.count_packet(recv_len)
+ self.accum.count_packet(recv_len)
pkt_type = header.pkt_type
if pkt_type in (PacketType.DATA_WITH_TS, PacketType.DATA_NO_TS):
- self.xfer.count_packet(recv_len)
self.sample_sink.accept_packet(packet)
elif pkt_type == PacketType.STRC:
req_payload = packet.get_payload_strc()
- resp_header = ChdrHeader()
- resp_header.dst_epid = req_payload.src_epid
- if req_payload.op_code == StrcOpCode.INIT:
- self.xfer.clear()
- elif req_payload.op_code == StrcOpCode.RESYNC:
- self.xfer.num_bytes = req_payload.xfer_count_bytes
- self.xfer.num_packets = req_payload.xfer_count_pkts
- resp_payload = self._generate_strs_payload(header.dst_epid)
- resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload)
+ # Ping doesn't set a new target, just checks status
+ if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT):
+ if req_payload.op_code == StrcOpCode.INIT:
+ self.xfer.clear()
+ self.fc_freq = XferCount.from_strc(req_payload)
+ self.command_addr = addr
+ self.command_epid = req_payload.src_epid
+ else:
+ self.xfer = XferCount.from_strc(req_payload)
+ resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid)
self.send_wrapper.send_packet(resp_packet, addr)
else:
raise RuntimeError("RX Worker received unsupported packet: {}".format(pkt_type))
+
+ # Check if a fc status packet is due
+ if self.fc_freq is not None and self.accum.has_exceeded(self.fc_freq):
+ self.accum.clear()
+ self.log.trace("Flow Control Due, sending STRS")
+ self.command_target = None
+ resp_packet = self._generate_strs_packet(self.command_epid, self.our_epid)
+ self.log.trace("Sending Flow Control: {}".format(resp_packet.to_string_with_payload()))
+ self.send_wrapper.send_packet(resp_packet, self.command_addr)
+
self.sample_sink.close()
self.log.info("Stream RX Worker Done")
@@ -141,8 +167,10 @@ class ChdrInputStream:
self.stop = True
self.rx_queue.put((None, None, None))
- def _generate_strs_payload(self, src_epid):
- """Create an strs payload from the information in self.xfer"""
+ def _generate_strs_packet(self, dst_epid, src_epid):
+ """Create an strs packet from the information in self.xfer"""
+ resp_header = ChdrHeader()
+ resp_header.dst_epid = dst_epid
resp_payload = StrsPayload()
resp_payload.src_epid = src_epid
resp_payload.status = StrsStatus.OKAY
@@ -150,7 +178,8 @@ class ChdrInputStream:
resp_payload.capacity_pkts = 0xFFFFFF
resp_payload.xfer_count_bytes = self.xfer.num_bytes
resp_payload.xfer_count_pkts = self.xfer.num_packets
- return resp_payload
+ resp_packet = ChdrPacket(self.chdr_w, resp_header, resp_payload)
+ return resp_packet
def queue_packet(self, packet, recv_len, addr):
"""Queue a packet to be processed by the ChdrInputStream"""