diff options
Diffstat (limited to 'mpm/python/usrp_mpm/simulator/chdr_stream.py')
-rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_stream.py | 83 |
1 files changed, 64 insertions, 19 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py index df7f613a4..b8a59bb36 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_stream.py +++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py @@ -12,7 +12,7 @@ import time from threading import Thread import queue import socket -from uhd.chdr import PacketType, StrcOpCode, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket +from uhd.chdr import PacketType, StrcOpCode, StrcPayload, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket class XferCount: """This class keeps track of flow control transfer status which are @@ -30,6 +30,13 @@ class XferCount: xfer.num_bytes = payload.num_bytes return xfer + @classmethod + def from_strs(cls, payload): + xfer = cls() + xfer.num_packets = payload.xfer_count_pkts + xfer.num_bytes = payload.xfer_count_bytes + return xfer + def has_exceeded(self, limit): """returns true if this XferCount >= the limit in either packets or bytes @@ -134,15 +141,14 @@ class ChdrInputStream: self.sample_sink.accept_packet(packet) elif pkt_type == PacketType.STRC: req_payload = packet.get_payload_strc() - # Ping doesn't set a new target, just checks status - if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT): - if req_payload.op_code == StrcOpCode.INIT: - self.xfer.clear() - self.fc_freq = XferCount.from_strc(req_payload) - self.command_addr = addr - self.command_epid = req_payload.src_epid - else: - self.xfer = XferCount.from_strc(req_payload) + # Ping doesn't change anything, just requests a stream status packet + if req_payload.op_code == StrcOpCode.INIT: + self.xfer.clear() + self.fc_freq = XferCount.from_strc(req_payload) + self.command_addr = addr + self.command_epid = req_payload.src_epid + elif req_payload.op_code == StrcOpCode.RESYNC: + self.xfer = XferCount.from_strc(req_payload) resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid) self.send_wrapper.send_packet(resp_packet, addr) else: @@ -200,34 +206,44 @@ class ChdrOutputStream: self.stream_spec = stream_spec self.send_wrapper = send_wrapper self.xfer = XferCount() + self.recv = XferCount() self.stop = False + self.strs_queue = queue.Queue(100) + self.strc_seq_num = 0 + self.data_seq_num = 0 + self.thread = Thread(target=self._tx_worker, daemon=True) self.thread.start() def _tx_worker(self): self.log.info("Stream TX Worker Starting with {} packets/sec" .format(1/self.stream_spec.seconds_per_packet())) + self.log.info("Downstream Buffer Capacity: {} packets or {} bytes" + .format(self.stream_spec.capacity_packets, self.stream_spec.capacity_bytes)) header = ChdrHeader() start_time = time.time() next_send = start_time header.dst_epid = self.stream_spec.dst_epid header.pkt_type = PacketType.DATA_NO_TS + is_continuous = self.stream_spec.is_continuous num_samps_left = None - if not self.stream_spec.is_continuous: + if not is_continuous: + # TODO: Put sample format/width in the stream spec num_samps_left = self.stream_spec.total_samples * 4 # SC16 is 4 bytes per sample - for seq_num in self.stream_spec.seq_num_iter(): + timestamp = self.stream_spec.init_timestamp \ + if self.stream_spec.is_timed else None + + while is_continuous or num_samps_left > 0: if self.stop: self.log.info("Stream Worker Stopped") break - if num_samps_left == 0: - break - header.seq_num = seq_num - timestamp = self.stream_spec.init_timestamp \ - if seq_num == 0 and self.stream_spec.is_timed \ - else None - packet = ChdrPacket(self.chdr_w, header, bytes(0)) + header.seq_num = self.data_seq_num + # When seq_num gets to 65535 (Max Unsigned 16 bit integer) + # It wraps back around to 0 + self.data_seq_num = int(self.data_seq_num + 1) & 0xFFFF + packet = ChdrPacket(self.chdr_w, header, bytes(0), timestamp) packet_samples = self.stream_spec.packet_samples if num_samps_left is not None: packet_samples = min(packet_samples, num_samps_left) @@ -242,6 +258,14 @@ class ChdrOutputStream: time.sleep(delay) next_send = next_send + self.stream_spec.seconds_per_packet() + timestamp = None + + # Check Flow Control to assert there is space downstream + while not self._can_fit_packet(len(send_data)): + strs_update = self.strs_queue.get() + strs_payload = strs_update.get_payload_strs() + self._update_recv(strs_payload) + self.send_wrapper.send_data(send_data, self.stream_spec.addr) self.xfer.count_packet(len(send_data)) @@ -254,3 +278,24 @@ class ChdrOutputStream: def finish(self): """Stops the ChdrOutputStream""" self.stop = True + + def queue_packet(self, packet): + """ Place an incoming STRS packet in the Queue """ + self.strs_queue.put_nowait(packet) + + def _can_fit_packet(self, length): + """ Can the downstream buffer fit a packet of length right now """ + packets_in_transit = self.xfer.num_packets - self.recv.num_packets + space_packets = self.stream_spec.capacity_packets - packets_in_transit + bytes_in_transit = self.xfer.num_bytes - self.recv.num_bytes + space_bytes = self.stream_spec.capacity_bytes - bytes_in_transit + return space_packets > 0 and space_bytes >= length + + def _update_recv(self, strs_payload): + """ Update the Xfer counts for downstream receive using + an incoming strs payload + """ + assert strs_payload.status == StrsStatus.OKAY, \ + "Flow Control Error: STRS Status is {}".format(strs_payload.status) + self.recv.num_packets = strs_payload.xfer_count_pkts + self.recv.num_bytes = strs_payload.xfer_count_bytes |