aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/simulator/stream_endpoint_node.py
blob: 06aaaa0465bff58b8d2c7864f6506266afd94723 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
#
# Copyright 2020 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
This is a stream endpoint node, which is used in the RFNoCGraph class.
It also houses the logic to create output and input streams.
"""
from uhd.chdr import MgmtOpCode, MgmtOpCfg, MgmtOp, PacketType, CtrlStatus, CtrlOpCode, \
    ChdrHeader, StrcOpCode, StrcPayload, ChdrPacket, StrsStatus
from .rfnoc_common import Node, NodeType, to_iter, swap_src_dst, RETURN_TO_SENDER
from .stream_ep_regs import StreamEpRegs, STRM_STATUS_FC_ENABLED
from .chdr_stream import ChdrOutputStream, ChdrInputStream

class StreamEndpointNode(Node):
    """Represents a Stream endpoint node

    This class contains a StreamEpRegs object. To clarify, management
    packets access these registers, while control packets access the
    registers of the noc_blocks which are held in the RFNoCGraph and
    passed into handle_packet as the regs parameter
    """
    def __init__(self, node_inst, source_gen, sink_gen):
        super().__init__(node_inst)
        self.epid = node_inst
        self.dst_epid = None
        self.upstream = None
        # ---- These 4 aren't configurable right now
        self.has_data = True
        self.has_ctrl = True
        self.input_ports = 1
        self.output_ports = 1
        # ----
        self.input_stream = None
        self.output_stream = None
        self.chdr_w = None
        self.send_wrapper = None
        self.dst_to_addr = None
        self.source_gen = source_gen
        self.sink_gen = sink_gen
        self.downstream_capacity = None
        self.strs_handlers = {}
        self.ep_regs = StreamEpRegs(self.get_epid, self.set_epid, self.set_dst_epid,
                                    self.ctrl_status_callback_out, self.ctrl_status_callback_in,
                                    4, 4 * 8000)

    def ctrl_status_callback_out(self, status):
        """Called by the ep_regs when on a write to the
        REG_OSTRM_CTRL_STATUS register
        """
        if status.cfg_start:
            # This only creates a new ChdrOutputStream if the cfg_start flag is set
            self.log.info("Starting Stream EPID:{} -> EPID:{}".format(self.epid, self.dst_epid))
            addr = self.dst_to_addr(self)
            self.send_strc(addr)
            def handle_initial_strs(strs_packet):
                payload = strs_packet.get_payload_strs()
                assert payload.status == StrsStatus.OKAY, \
                    "Received STRS Packet Err: {}".format(payload.status)
                self.downstream_capacity = (payload.capacity_pkts, payload.capacity_bytes)
            self.strs_handlers[self.epid] = handle_initial_strs
        return STRM_STATUS_FC_ENABLED

    def ctrl_status_callback_in(self, status):
        """Called by the ep_regs on a write to the
        REG_OSTRM_CTRL_STATUS register
        """
        # This always triggers the graph to create a new ChdrInputStream
        self.begin_input()
        return STRM_STATUS_FC_ENABLED

    def graph_init(self, log, set_device_id, send_wrapper, chdr_w, dst_to_addr, **kwargs):
        super().graph_init(log, set_device_id)
        self.ep_regs.log = log
        self.chdr_w = chdr_w
        self.send_wrapper = send_wrapper
        self.dst_to_addr = dst_to_addr

    def get_type(self):
        return NodeType.STRM_EP

    def get_epid(self):
        """Get this endpoint's endpoint id"""
        return self.epid

    def set_epid(self, epid):
        """Set this endpoint's endpoint id"""
        self.epid = epid

    def set_dst_epid(self, dst_epid):
        """Set this endpoint's destination endpoint id"""
        self.dst_epid = dst_epid

    def _handle_mgmt_packet(self, packet, **kwargs):
        send_upstream = False
        payload = packet.get_payload_mgmt()
        our_hop = payload.pop_hop()
        for op in to_iter(our_hop.get_op, our_hop.get_num_ops()):
            if op.op_code == MgmtOpCode.INFO_REQ:
                ext_info = 0
                ext_info |= (1 if self.has_ctrl else 0) << 0
                ext_info |= (1 if self.has_data else 0) << 1
                ext_info |= (self.input_ports & 0x3F) << 2
                ext_info |= (self.output_ports & 0x3F) << 8
                payload.get_hop(0).add_op(self.info_response(ext_info))
            elif op.op_code == MgmtOpCode.RETURN:
                send_upstream = True
                swap_src_dst(packet, payload)
            elif op.op_code == MgmtOpCode.NOP:
                pass
            elif op.op_code == MgmtOpCode.CFG_RD_REQ:
                request = MgmtOpCfg.parse(op.get_op_payload())
                value = self.ep_regs.read(request.addr)
                payload.get_hop(0).add_op(MgmtOp(MgmtOpCode.CFG_RD_RESP,
                                                 MgmtOpCfg(request.addr, value)))
            elif op.op_code == MgmtOpCode.CFG_WR_REQ:
                request = MgmtOpCfg.parse(op.get_op_payload())
                self.ep_regs.write(request.addr, request.data)
            else:
                raise NotImplementedError("op_code {} is not implemented for StreamEndpointNode"
                                          .format(op.op_code))
        self.log.trace("Stream Endpoint {} processed hop:\n{}"
                       .format(self.node_inst, our_hop))
        packet.set_payload(payload)
        if send_upstream:
            return RETURN_TO_SENDER
        self.log.trace("Stream Endpoint {} received packet:\n{}"
                       .format(self.node_inst, packet))

    def _handle_ctrl_packet(self, packet, regs, **kwargs):
        payload = packet.get_payload_ctrl()
        swap_src_dst(packet, payload)
        if payload.status != CtrlStatus.OKAY:
            raise RuntimeError("Control Status not OK: {}".format(payload.status))
        if payload.op_code == CtrlOpCode.READ:
            payload.is_ack = True
            payload.set_data([regs.read(payload.address)])
        elif payload.op_code == CtrlOpCode.WRITE:
            payload.is_ack = True
            regs.write(payload.address, payload.get_data()[0])
        else:
            raise NotImplementedError("Unknown Control OpCode: {}".format(payload.op_code))
        packet.set_payload(payload)
        return RETURN_TO_SENDER

    def _handle_data_packet(self, packet, num_bytes, sender, **kwargs):
        assert self.input_stream is not None
        self.input_stream.queue_packet(packet, num_bytes, sender)

    def _handle_strs_packet(self, packet, **kwargs):
        header = packet.get_header()
        if header.dst_epid in self.strs_handlers:
            self.strs_handlers.pop(header.dst_epid)(packet)
        else:
            self.output_stream.queue_packet(packet)

    def _handle_strc_packet(self, packet, **kwargs):
        self._handle_data_packet(packet, **kwargs)

    def send_strc(self, addr):
        """Send a Stream Command packet from the specified stream_ep
        to the specified address.

        This is not handled in ChdrOutputStream because the STRC must be
        dispatched before UHD configures the samples per packet,
        which is required to complete a StreamSpec
        """
        header = ChdrHeader()
        header.dst_epid = self.dst_epid
        header.pkt_type = PacketType.STRC
        payload = StrcPayload()
        payload.src_epid = self.epid
        payload.op_code = StrcOpCode.INIT
        payload.num_pkts = 10
        payload.num_bytes = 10 * 1024 # 10 KB
        packet = ChdrPacket(self.chdr_w, header, payload)
        self.send_wrapper.send_packet(packet, addr)

    def begin_output(self, stream_spec):
        """Spin up a new ChdrOutputStream thread which transmits from src_epid
        according to stream_spec.

        This is triggered from RFNoC Graph when the radio receives a
        Stream Command
        """
        # As of now, only one stream endpoint port per stream endpoint
        # is supported.
        assert self.output_stream is None, \
            "Output Stream already running on epid: {}".format(self.epid)
        stream_spec.dst_epid = self.dst_epid
        stream_spec.capacity_packets = self.downstream_capacity[0]
        stream_spec.capacity_bytes = self.downstream_capacity[1]
        self.downstream_capacity = None
        self.output_stream = ChdrOutputStream(self.log, self.chdr_w, self.source_gen(),
                                              stream_spec, self.send_wrapper)

    def end_output(self):
        """Stops src_epid's current transmission. This opens up the sep
        to new transmissions in the future.

        This is triggered either by the RFNoC Graph when the radio
        receives a Stop Stream command or when the transmission has no
        more samples to send.
        """
        self.output_stream.finish()
        self.output_stream = None

    def begin_input(self):
        """Spin up a new ChdrInputStream thread which receives all data and strc

        This is triggered by RFNoC Graph when there is a write to the
        REG_ISTRM_CTRL_STATUS register
        """
        # As of now, only one stream endpoint port per stream endpoint
        # is supported.

        # Rx streams aren't explicitly ended by UHD. If we try to start
        # a new one on the same epid, just quietly close the old one.
        if self.input_stream is not None:
            self.input_stream.finish()
        self.input_stream = ChdrInputStream(self.log, self.chdr_w,
                                            self.sink_gen(), self.send_wrapper, self.epid)