diff options
Diffstat (limited to 'mpm/python/usrp_mpm')
-rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_stream.py | 83 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/rfnoc_common.py | 15 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/simulator/stream_endpoint_node.py | 21 |
3 files changed, 85 insertions, 34 deletions
diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py index df7f613a4..b8a59bb36 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_stream.py +++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py @@ -12,7 +12,7 @@ import time from threading import Thread import queue import socket -from uhd.chdr import PacketType, StrcOpCode, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket +from uhd.chdr import PacketType, StrcOpCode, StrcPayload, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket class XferCount: """This class keeps track of flow control transfer status which are @@ -30,6 +30,13 @@ class XferCount: xfer.num_bytes = payload.num_bytes return xfer + @classmethod + def from_strs(cls, payload): + xfer = cls() + xfer.num_packets = payload.xfer_count_pkts + xfer.num_bytes = payload.xfer_count_bytes + return xfer + def has_exceeded(self, limit): """returns true if this XferCount >= the limit in either packets or bytes @@ -134,15 +141,14 @@ class ChdrInputStream: self.sample_sink.accept_packet(packet) elif pkt_type == PacketType.STRC: req_payload = packet.get_payload_strc() - # Ping doesn't set a new target, just checks status - if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT): - if req_payload.op_code == StrcOpCode.INIT: - self.xfer.clear() - self.fc_freq = XferCount.from_strc(req_payload) - self.command_addr = addr - self.command_epid = req_payload.src_epid - else: - self.xfer = XferCount.from_strc(req_payload) + # Ping doesn't change anything, just requests a stream status packet + if req_payload.op_code == StrcOpCode.INIT: + self.xfer.clear() + self.fc_freq = XferCount.from_strc(req_payload) + self.command_addr = addr + self.command_epid = req_payload.src_epid + elif req_payload.op_code == StrcOpCode.RESYNC: + self.xfer = XferCount.from_strc(req_payload) resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid) self.send_wrapper.send_packet(resp_packet, addr) else: @@ -200,34 +206,44 @@ class ChdrOutputStream: self.stream_spec = stream_spec self.send_wrapper = send_wrapper self.xfer = XferCount() + self.recv = XferCount() self.stop = False + self.strs_queue = queue.Queue(100) + self.strc_seq_num = 0 + self.data_seq_num = 0 + self.thread = Thread(target=self._tx_worker, daemon=True) self.thread.start() def _tx_worker(self): self.log.info("Stream TX Worker Starting with {} packets/sec" .format(1/self.stream_spec.seconds_per_packet())) + self.log.info("Downstream Buffer Capacity: {} packets or {} bytes" + .format(self.stream_spec.capacity_packets, self.stream_spec.capacity_bytes)) header = ChdrHeader() start_time = time.time() next_send = start_time header.dst_epid = self.stream_spec.dst_epid header.pkt_type = PacketType.DATA_NO_TS + is_continuous = self.stream_spec.is_continuous num_samps_left = None - if not self.stream_spec.is_continuous: + if not is_continuous: + # TODO: Put sample format/width in the stream spec num_samps_left = self.stream_spec.total_samples * 4 # SC16 is 4 bytes per sample - for seq_num in self.stream_spec.seq_num_iter(): + timestamp = self.stream_spec.init_timestamp \ + if self.stream_spec.is_timed else None + + while is_continuous or num_samps_left > 0: if self.stop: self.log.info("Stream Worker Stopped") break - if num_samps_left == 0: - break - header.seq_num = seq_num - timestamp = self.stream_spec.init_timestamp \ - if seq_num == 0 and self.stream_spec.is_timed \ - else None - packet = ChdrPacket(self.chdr_w, header, bytes(0)) + header.seq_num = self.data_seq_num + # When seq_num gets to 65535 (Max Unsigned 16 bit integer) + # It wraps back around to 0 + self.data_seq_num = int(self.data_seq_num + 1) & 0xFFFF + packet = ChdrPacket(self.chdr_w, header, bytes(0), timestamp) packet_samples = self.stream_spec.packet_samples if num_samps_left is not None: packet_samples = min(packet_samples, num_samps_left) @@ -242,6 +258,14 @@ class ChdrOutputStream: time.sleep(delay) next_send = next_send + self.stream_spec.seconds_per_packet() + timestamp = None + + # Check Flow Control to assert there is space downstream + while not self._can_fit_packet(len(send_data)): + strs_update = self.strs_queue.get() + strs_payload = strs_update.get_payload_strs() + self._update_recv(strs_payload) + self.send_wrapper.send_data(send_data, self.stream_spec.addr) self.xfer.count_packet(len(send_data)) @@ -254,3 +278,24 @@ class ChdrOutputStream: def finish(self): """Stops the ChdrOutputStream""" self.stop = True + + def queue_packet(self, packet): + """ Place an incoming STRS packet in the Queue """ + self.strs_queue.put_nowait(packet) + + def _can_fit_packet(self, length): + """ Can the downstream buffer fit a packet of length right now """ + packets_in_transit = self.xfer.num_packets - self.recv.num_packets + space_packets = self.stream_spec.capacity_packets - packets_in_transit + bytes_in_transit = self.xfer.num_bytes - self.recv.num_bytes + space_bytes = self.stream_spec.capacity_bytes - bytes_in_transit + return space_packets > 0 and space_bytes >= length + + def _update_recv(self, strs_payload): + """ Update the Xfer counts for downstream receive using + an incoming strs payload + """ + assert strs_payload.status == StrsStatus.OKAY, \ + "Flow Control Error: STRS Status is {}".format(strs_payload.status) + self.recv.num_packets = strs_payload.xfer_count_pkts + self.recv.num_bytes = strs_payload.xfer_count_bytes diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_common.py b/mpm/python/usrp_mpm/simulator/rfnoc_common.py index d7f61a4c1..b1dab559f 100644 --- a/mpm/python/usrp_mpm/simulator/rfnoc_common.py +++ b/mpm/python/usrp_mpm/simulator/rfnoc_common.py @@ -153,6 +153,8 @@ class StreamSpec: self.sample_rate = None self.dst_epid = None self.addr = None + self.capacity_packets = 0 + self.capacity_bytes = 0 def set_timestamp_lo(self, low): """Set the low 32 bits of the initial timestamp""" @@ -182,19 +184,6 @@ class StreamSpec: assert self.sample_rate != 0 return self.packet_samples / self.sample_rate - def seq_num_iter(self): - """Returns a generator which returns an incrementing integer - for each packet that should be sent. This is useful to set the - seq_num of each transmitted packet. - """ - i = 0 - while True: - if not self.is_continuous: - if i >= self.total_samples: - return - yield i - i += 1 - def __str__(self): return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \ "sample_rate: {}, dst_epid: {}, addr: {}}}" \ diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py index b4477ee47..06aaaa046 100644 --- a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py +++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py @@ -8,7 +8,7 @@ This is a stream endpoint node, which is used in the RFNoCGraph class. It also houses the logic to create output and input streams. """ from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \ - ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket + ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket, StrsStatus from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED from .chdr_stream import ChdrOutputStream, ChdrInputStream @@ -39,6 +39,8 @@ class StreamEndpointNode(Node): self.dst_to_addr = None self.source_gen = source_gen self.sink_gen = sink_gen + self.downstream_capacity = None + self.strs_handlers = {} self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid, self.ctrl_status_callback_out, self.ctrl_status_callback_in, 4, 4 * 8000) @@ -52,6 +54,12 @@ class StreamEndpointNode(Node): self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid)) addr = self.dst_to_addr(self) self.send_strc(addr) + def handle_initial_strs(strs_packet): + payload = strs_packet.get_payload_strs() + assert payload.status == StrsStatus.OKAY, \ + "Received STRS Packet Err: {}".format(payload.status) + self.downstream_capacity = (payload.capacity_pkts, payload.capacity_bytes) + self.strs_handlers[self.epid] = handle_initial_strs return STRM_STATUS_FC_ENABLED def ctrl_status_callback_in(self, status): @@ -141,7 +149,11 @@ class StreamEndpointNode(Node): self.input_stream.queue_packet(packet, num_bytes, sender) def _handle_strs_packet(self, packet, **kwargs): - pass # TODO: Implement flow control for output streams + header = packet.get_header() + if header.dst_epid in self.strs_handlers: + self.strs_handlers.pop(header.dst_epid)(packet) + else: + self.output_stream.queue_packet(packet) def _handle_strc_packet(self, packet, **kwargs): self._handle_data_packet(packet, **kwargs) @@ -160,6 +172,8 @@ class StreamEndpointNode(Node): payload = StrcPayload() payload.src_epid = self.epid payload.op_code = StrcOpCode.INIT + payload.num_pkts = 10 + payload.num_bytes = 10 * 1024 # 10 KB packet = ChdrPacket(self.chdr_w, header, payload) self.send_wrapper.send_packet(packet, addr) @@ -175,6 +189,9 @@ class StreamEndpointNode(Node): assert self.output_stream is None, \ "Output Stream already running on epid: {}".format(self.epid) stream_spec.dst_epid = self.dst_epid + stream_spec.capacity_packets = self.downstream_capacity[0] + stream_spec.capacity_bytes = self.downstream_capacity[1] + self.downstream_capacity = None self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(), stream_spec, self.send_wrapper) |