aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/simulator/chdr_stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/chdr_stream.py')
-rw-r--r--mpm/python/usrp_mpm/simulator/chdr_stream.py83
1 files changed, 64 insertions, 19 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py
index df7f613a4..b8a59bb36 100644
--- a/mpm/python/usrp_mpm/simulator/chdr_stream.py
+++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py
@@ -12,7 +12,7 @@ import time
from threading import Thread
import queue
import socket
-from uhd.chdr import PacketType, StrcOpCode, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket
+from uhd.chdr import PacketType, StrcOpCode, StrcPayload, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket
class XferCount:
"""This class keeps track of flow control transfer status which are
@@ -30,6 +30,13 @@ class XferCount:
xfer.num_bytes = payload.num_bytes
return xfer
+ @classmethod
+ def from_strs(cls, payload):
+ xfer = cls()
+ xfer.num_packets = payload.xfer_count_pkts
+ xfer.num_bytes = payload.xfer_count_bytes
+ return xfer
+
def has_exceeded(self, limit):
"""returns true if this XferCount >= the limit
in either packets or bytes
@@ -134,15 +141,14 @@ class ChdrInputStream:
self.sample_sink.accept_packet(packet)
elif pkt_type == PacketType.STRC:
req_payload = packet.get_payload_strc()
- # Ping doesn't set a new target, just checks status
- if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT):
- if req_payload.op_code == StrcOpCode.INIT:
- self.xfer.clear()
- self.fc_freq = XferCount.from_strc(req_payload)
- self.command_addr = addr
- self.command_epid = req_payload.src_epid
- else:
- self.xfer = XferCount.from_strc(req_payload)
+ # Ping doesn't change anything, just requests a stream status packet
+ if req_payload.op_code == StrcOpCode.INIT:
+ self.xfer.clear()
+ self.fc_freq = XferCount.from_strc(req_payload)
+ self.command_addr = addr
+ self.command_epid = req_payload.src_epid
+ elif req_payload.op_code == StrcOpCode.RESYNC:
+ self.xfer = XferCount.from_strc(req_payload)
resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid)
self.send_wrapper.send_packet(resp_packet, addr)
else:
@@ -200,34 +206,44 @@ class ChdrOutputStream:
self.stream_spec = stream_spec
self.send_wrapper = send_wrapper
self.xfer = XferCount()
+ self.recv = XferCount()
self.stop = False
+ self.strs_queue = queue.Queue(100)
+ self.strc_seq_num = 0
+ self.data_seq_num = 0
+
self.thread = Thread(target=self._tx_worker, daemon=True)
self.thread.start()
def _tx_worker(self):
self.log.info("Stream TX Worker Starting with {} packets/sec"
.format(1/self.stream_spec.seconds_per_packet()))
+ self.log.info("Downstream Buffer Capacity: {} packets or {} bytes"
+ .format(self.stream_spec.capacity_packets, self.stream_spec.capacity_bytes))
header = ChdrHeader()
start_time = time.time()
next_send = start_time
header.dst_epid = self.stream_spec.dst_epid
header.pkt_type = PacketType.DATA_NO_TS
+ is_continuous = self.stream_spec.is_continuous
num_samps_left = None
- if not self.stream_spec.is_continuous:
+ if not is_continuous:
+ # TODO: Put sample format/width in the stream spec
num_samps_left = self.stream_spec.total_samples * 4 # SC16 is 4 bytes per sample
- for seq_num in self.stream_spec.seq_num_iter():
+ timestamp = self.stream_spec.init_timestamp \
+ if self.stream_spec.is_timed else None
+
+ while is_continuous or num_samps_left > 0:
if self.stop:
self.log.info("Stream Worker Stopped")
break
- if num_samps_left == 0:
- break
- header.seq_num = seq_num
- timestamp = self.stream_spec.init_timestamp \
- if seq_num == 0 and self.stream_spec.is_timed \
- else None
- packet = ChdrPacket(self.chdr_w, header, bytes(0))
+ header.seq_num = self.data_seq_num
+ # When seq_num gets to 65535 (Max Unsigned 16 bit integer)
+ # It wraps back around to 0
+ self.data_seq_num = int(self.data_seq_num + 1) & 0xFFFF
+ packet = ChdrPacket(self.chdr_w, header, bytes(0), timestamp)
packet_samples = self.stream_spec.packet_samples
if num_samps_left is not None:
packet_samples = min(packet_samples, num_samps_left)
@@ -242,6 +258,14 @@ class ChdrOutputStream:
time.sleep(delay)
next_send = next_send + self.stream_spec.seconds_per_packet()
+ timestamp = None
+
+ # Check Flow Control to assert there is space downstream
+ while not self._can_fit_packet(len(send_data)):
+ strs_update = self.strs_queue.get()
+ strs_payload = strs_update.get_payload_strs()
+ self._update_recv(strs_payload)
+
self.send_wrapper.send_data(send_data, self.stream_spec.addr)
self.xfer.count_packet(len(send_data))
@@ -254,3 +278,24 @@ class ChdrOutputStream:
def finish(self):
"""Stops the ChdrOutputStream"""
self.stop = True
+
+ def queue_packet(self, packet):
+ """ Place an incoming STRS packet in the Queue """
+ self.strs_queue.put_nowait(packet)
+
+ def _can_fit_packet(self, length):
+ """ Can the downstream buffer fit a packet of length right now """
+ packets_in_transit = self.xfer.num_packets - self.recv.num_packets
+ space_packets = self.stream_spec.capacity_packets - packets_in_transit
+ bytes_in_transit = self.xfer.num_bytes - self.recv.num_bytes
+ space_bytes = self.stream_spec.capacity_bytes - bytes_in_transit
+ return space_packets > 0 and space_bytes >= length
+
+ def _update_recv(self, strs_payload):
+ """ Update the Xfer counts for downstream receive using
+ an incoming strs payload
+ """
+ assert strs_payload.status == StrsStatus.OKAY, \
+ "Flow Control Error: STRS Status is {}".format(strs_payload.status)
+ self.recv.num_packets = strs_payload.xfer_count_pkts
+ self.recv.num_bytes = strs_payload.xfer_count_bytes