diff options
| author | Samuel O'Brien <sam.obrien@ni.com> | 2020-08-04 14:48:34 -0500 | 
|---|---|---|
| committer | Aaron Rossetto <aaron.rossetto@ni.com> | 2020-10-28 15:25:48 -0500 | 
| commit | a56f4f63e235c7511cccfc291d395481828703f5 (patch) | |
| tree | c4b7bc71417bc5bdacc91f6a0f5d3fd9cabca62e /mpm/python | |
| parent | 5df8202c0cb88fb5c7629fbf0ce5bed32c96e70d (diff) | |
| download | uhd-a56f4f63e235c7511cccfc291d395481828703f5.tar.gz uhd-a56f4f63e235c7511cccfc291d395481828703f5.tar.bz2 uhd-a56f4f63e235c7511cccfc291d395481828703f5.zip | |
sim: Implement Sim > UHD Flow Control
This commit adds flow control support when streaming data from the
Simulator to UHD. It no longer ignores STRS packets.
Signed-off-by: Samuel O'Brien <sam.obrien@ni.com>
Diffstat (limited to 'mpm/python')
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/chdr_stream.py | 83 | ||||
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/rfnoc_common.py | 15 | ||||
| -rw-r--r-- | mpm/python/usrp_mpm/simulator/stream_endpoint_node.py | 21 | 
3 files changed, 85 insertions, 34 deletions
| diff --git a/mpm/python/usrp_mpm/simulator/chdr_stream.py b/mpm/python/usrp_mpm/simulator/chdr_stream.py index df7f613a4..b8a59bb36 100644 --- a/mpm/python/usrp_mpm/simulator/chdr_stream.py +++ b/mpm/python/usrp_mpm/simulator/chdr_stream.py @@ -12,7 +12,7 @@ import time  from threading import Thread  import queue  import socket -from uhd.chdr import PacketType, StrcOpCode, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket +from uhd.chdr import PacketType, StrcOpCode, StrcPayload, StrsPayload, StrsStatus, ChdrHeader, ChdrPacket  class XferCount:      """This class keeps track of flow control transfer status which are @@ -30,6 +30,13 @@ class XferCount:          xfer.num_bytes = payload.num_bytes          return xfer +    @classmethod +    def from_strs(cls, payload): +        xfer = cls() +        xfer.num_packets = payload.xfer_count_pkts +        xfer.num_bytes = payload.xfer_count_bytes +        return xfer +      def has_exceeded(self, limit):          """returns true if this XferCount >= the limit          in either packets or bytes @@ -134,15 +141,14 @@ class ChdrInputStream:                  self.sample_sink.accept_packet(packet)              elif pkt_type == PacketType.STRC:                  req_payload = packet.get_payload_strc() -                # Ping doesn't set a new target, just checks status -                if req_payload.op_code in (StrcOpCode.RESYNC, StrcOpCode.INIT): -                    if req_payload.op_code == StrcOpCode.INIT: -                        self.xfer.clear() -                        self.fc_freq = XferCount.from_strc(req_payload) -                        self.command_addr = addr -                        self.command_epid = req_payload.src_epid -                    else: -                        self.xfer = XferCount.from_strc(req_payload) +                # Ping doesn't change anything, just requests a stream status packet +                if req_payload.op_code == StrcOpCode.INIT: +                    self.xfer.clear() +                    self.fc_freq = XferCount.from_strc(req_payload) +                    self.command_addr = addr +                    self.command_epid = req_payload.src_epid +                elif req_payload.op_code == StrcOpCode.RESYNC: +                    self.xfer = XferCount.from_strc(req_payload)                  resp_packet = self._generate_strs_packet(req_payload.src_epid, self.our_epid)                  self.send_wrapper.send_packet(resp_packet, addr)              else: @@ -200,34 +206,44 @@ class ChdrOutputStream:          self.stream_spec = stream_spec          self.send_wrapper = send_wrapper          self.xfer = XferCount() +        self.recv = XferCount()          self.stop = False +        self.strs_queue = queue.Queue(100) +        self.strc_seq_num = 0 +        self.data_seq_num = 0 +          self.thread = Thread(target=self._tx_worker, daemon=True)          self.thread.start()      def _tx_worker(self):          self.log.info("Stream TX Worker Starting with {} packets/sec"                        .format(1/self.stream_spec.seconds_per_packet())) +        self.log.info("Downstream Buffer Capacity: {} packets or {} bytes" +                      .format(self.stream_spec.capacity_packets, self.stream_spec.capacity_bytes))          header = ChdrHeader()          start_time = time.time()          next_send = start_time          header.dst_epid = self.stream_spec.dst_epid          header.pkt_type = PacketType.DATA_NO_TS +        is_continuous = self.stream_spec.is_continuous          num_samps_left = None -        if not self.stream_spec.is_continuous: +        if not is_continuous: +            # TODO: Put sample format/width in the stream spec              num_samps_left = self.stream_spec.total_samples * 4 # SC16 is 4 bytes per sample -        for seq_num in self.stream_spec.seq_num_iter(): +        timestamp = self.stream_spec.init_timestamp  \ +            if self.stream_spec.is_timed else None + +        while is_continuous or num_samps_left > 0:              if self.stop:                  self.log.info("Stream Worker Stopped")                  break -            if num_samps_left == 0: -                break -            header.seq_num = seq_num -            timestamp = self.stream_spec.init_timestamp  \ -                if seq_num == 0 and self.stream_spec.is_timed \ -                else None -            packet = ChdrPacket(self.chdr_w, header, bytes(0)) +            header.seq_num = self.data_seq_num +            # When seq_num gets to 65535 (Max Unsigned 16 bit integer) +            # It wraps back around to 0 +            self.data_seq_num = int(self.data_seq_num + 1) & 0xFFFF +            packet = ChdrPacket(self.chdr_w, header, bytes(0), timestamp)              packet_samples = self.stream_spec.packet_samples              if num_samps_left is not None:                  packet_samples = min(packet_samples, num_samps_left) @@ -242,6 +258,14 @@ class ChdrOutputStream:                  time.sleep(delay)              next_send = next_send + self.stream_spec.seconds_per_packet() +            timestamp = None + +            # Check Flow Control to assert there is space downstream +            while not self._can_fit_packet(len(send_data)): +                strs_update = self.strs_queue.get() +                strs_payload = strs_update.get_payload_strs() +                self._update_recv(strs_payload) +              self.send_wrapper.send_data(send_data, self.stream_spec.addr)              self.xfer.count_packet(len(send_data)) @@ -254,3 +278,24 @@ class ChdrOutputStream:      def finish(self):          """Stops the ChdrOutputStream"""          self.stop = True + +    def queue_packet(self, packet): +        """ Place an incoming STRS packet in the Queue """ +        self.strs_queue.put_nowait(packet) + +    def _can_fit_packet(self, length): +        """ Can the downstream buffer fit a packet of length right now """ +        packets_in_transit = self.xfer.num_packets - self.recv.num_packets +        space_packets = self.stream_spec.capacity_packets - packets_in_transit +        bytes_in_transit = self.xfer.num_bytes - self.recv.num_bytes +        space_bytes = self.stream_spec.capacity_bytes - bytes_in_transit +        return space_packets > 0 and space_bytes >= length + +    def _update_recv(self, strs_payload): +        """ Update the Xfer counts for downstream receive using +        an incoming strs payload +        """ +        assert strs_payload.status == StrsStatus.OKAY, \ +            "Flow Control Error: STRS Status is {}".format(strs_payload.status) +        self.recv.num_packets = strs_payload.xfer_count_pkts +        self.recv.num_bytes = strs_payload.xfer_count_bytes diff --git a/mpm/python/usrp_mpm/simulator/rfnoc_common.py b/mpm/python/usrp_mpm/simulator/rfnoc_common.py index d7f61a4c1..b1dab559f 100644 --- a/mpm/python/usrp_mpm/simulator/rfnoc_common.py +++ b/mpm/python/usrp_mpm/simulator/rfnoc_common.py @@ -153,6 +153,8 @@ class StreamSpec:          self.sample_rate = None          self.dst_epid = None          self.addr = None +        self.capacity_packets = 0 +        self.capacity_bytes = 0      def set_timestamp_lo(self, low):          """Set the low 32 bits of the initial timestamp""" @@ -182,19 +184,6 @@ class StreamSpec:          assert self.sample_rate != 0          return self.packet_samples / self.sample_rate -    def seq_num_iter(self): -        """Returns a generator which returns an incrementing integer -        for each packet that should be sent. This is useful to set the -        seq_num of each transmitted packet. -        """ -        i = 0 -        while True: -            if not self.is_continuous: -                if i >= self.total_samples: -                    return -            yield i -            i += 1 -      def __str__(self):          return "StreamSpec{{total_samples: {}, is_continuous: {}, packet_samples: {}," \                 "sample_rate: {}, dst_epid: {}, addr: {}}}" \ diff --git a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py index b4477ee47..06aaaa046 100644 --- a/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py +++ b/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py @@ -8,7 +8,7 @@ This is a stream endpoint node, which is used in the RFNoCGraph class.  It also houses the logic to create output and input streams.  """  from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \ -    ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket +    ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket, StrsStatus  from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER  from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED  from .chdr_stream import ChdrOutputStream, ChdrInputStream @@ -39,6 +39,8 @@ class StreamEndpointNode(Node):          self.dst_to_addr = None          self.source_gen = source_gen          self.sink_gen = sink_gen +        self.downstream_capacity = None +        self.strs_handlers = {}          self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,                                      self.ctrl_status_callback_out, self.ctrl_status_callback_in,                                      4, 4 * 8000) @@ -52,6 +54,12 @@ class StreamEndpointNode(Node):              self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))              addr = self.dst_to_addr(self)              self.send_strc(addr) +            def handle_initial_strs(strs_packet): +                payload = strs_packet.get_payload_strs() +                assert payload.status == StrsStatus.OKAY, \ +                    "Received STRS Packet Err: {}".format(payload.status) +                self.downstream_capacity = (payload.capacity_pkts, payload.capacity_bytes) +            self.strs_handlers[self.epid] = handle_initial_strs          return STRM_STATUS_FC_ENABLED      def ctrl_status_callback_in(self, status): @@ -141,7 +149,11 @@ class StreamEndpointNode(Node):          self.input_stream.queue_packet(packet, num_bytes, sender)      def _handle_strs_packet(self, packet, **kwargs): -        pass # TODO: Implement flow control for output streams +        header = packet.get_header() +        if header.dst_epid in self.strs_handlers: +            self.strs_handlers.pop(header.dst_epid)(packet) +        else: +            self.output_stream.queue_packet(packet)      def _handle_strc_packet(self, packet, **kwargs):          self._handle_data_packet(packet, **kwargs) @@ -160,6 +172,8 @@ class StreamEndpointNode(Node):          payload = StrcPayload()          payload.src_epid = self.epid          payload.op_code = StrcOpCode.INIT +        payload.num_pkts = 10 +        payload.num_bytes = 10 * 1024 # 10 KB          packet = ChdrPacket(self.chdr_w, header, payload)          self.send_wrapper.send_packet(packet, addr) @@ -175,6 +189,9 @@ class StreamEndpointNode(Node):          assert self.output_stream is None, \              "Output Stream already running on epid: {}".format(self.epid)          stream_spec.dst_epid = self.dst_epid +        stream_spec.capacity_packets = self.downstream_capacity[0] +        stream_spec.capacity_bytes = self.downstream_capacity[1] +        self.downstream_capacity = None          self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(),                                                stream_spec, self.send_wrapper) | 
