aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/xports/xportmgr_udp.py
blob: 40ba22414cb5fe341b0fee27ee2d52e824ce1b14 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#
# Copyright 2017 Ettus Research, a National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
UDP Transport manager
"""

from builtins import object
from six import iteritems, itervalues
from usrp_mpm.ethtable import EthDispatcherTable
from usrp_mpm.sys_utils import net
from usrp_mpm.mpmtypes import SID
from usrp_mpm import lib

class XportMgrUDP(object):
    """
    Transport manager for UDP connections
    """
    # The interface configuration describes how the Ethernet interfaces are
    # hooked up to the crossbar and the FPGA. It could look like this:
    # iface_config = {
    #     'eth1': { # Add key for every Ethernet iface connected to the FPGA
    #         'label': 'misc-enet-regs0', # UIO label for the Eth table
    #         'xbar': 0, # Which crossbar? 0 -> /dev/crossbar0
    #         'xbar_port': 0, # Which port on the crossbar it is connected to
    #     },
    # }
    iface_config = {}
    # The control addresses are typically addresses bound to the controlling
    # UHD session. When the requested source address is below or equal to this
    # number, we override requested SID source addresses based on other logic.
    max_ctrl_addr = 1

    def __init__(self, log):
        assert len(self.iface_config)
        assert all((
            all((key in x for key in ('label', 'xbar', 'xbar_port')))
            for x in itervalues(self.iface_config)
        ))
        self.log = log
        self.log.trace("Initializing UDP xport manager...")
        self._possible_chdr_ifaces = self.iface_config.keys()
        self.log.trace("Identifying available network interfaces...")
        self.chdr_port = EthDispatcherTable.DEFAULT_VITA_PORT[0]
        self._chdr_ifaces = \
            self._init_interfaces(self._possible_chdr_ifaces)
        self._eth_dispatchers = {}
        self._allocations = {}
        self._previous_block_ep = {}

    def _init_interfaces(self, possible_ifaces):
        """
        Enumerate all network interfaces that are currently able to stream CHDR
        Returns a dictionary iface name -> iface info, where iface info is the
        return value of get_iface_info().
        """
        self.log.trace("Testing available interfaces out of `{}'".format(
            list(possible_ifaces)
        ))
        valid_iface_infos = {
            x: net.get_iface_info(x)
            for x in net.get_valid_interfaces(possible_ifaces)
        }
        # Because get_iface_info() and get_valid_interfaces() are not one atomic
        # operation, there are rare scenarios when their return values are
        # inconsistent. To catch these cases, we filter the list again and warn
        # the user. Usually, this is not a problem and the next call to
        # _init_interfaces() will be back to normal.
        valid_iface_infos_filtered = {
            x: valid_iface_infos[x]
            for x in valid_iface_infos
            if valid_iface_infos[x]['ip_addr']
        }
        if len(valid_iface_infos) != len(valid_iface_infos_filtered):
            self.log.warning(
                "Number of detected CHDR devices is inconsistent. Dropped from "
                "{} to {}."
                .format(len(valid_iface_infos), len(valid_iface_infos_filtered))
            )
        if len(valid_iface_infos_filtered):
            self.log.debug(
                "Found CHDR interfaces: `{}'"
                .format(", ".join(list(valid_iface_infos.keys())))
            )
        else:
            self.log.info("No CHDR interfaces found!")
        return valid_iface_infos_filtered

    def _update_dispatchers(self):
        """
        Updates the self._eth_dispatchers dictionary, makes sure that all IP
        addresses are programmed correctly.

        After calling this, _chdr_ifaces and _eth_dispatchers are in sync.
        """
        ifaces_to_remove = [
            x for x in self._eth_dispatchers.keys()
            if x not in self._chdr_ifaces
        ]
        for iface in ifaces_to_remove:
            self._eth_dispatchers.pop(iface)
        for iface in self._chdr_ifaces:
            if iface not in self._eth_dispatchers:
                self._eth_dispatchers[iface] = \
                    EthDispatcherTable(self.iface_config[iface]['label'])
            self._eth_dispatchers[iface].set_ipv4_addr(
                self._chdr_ifaces[iface]['ip_addr']
            )

    def init(self, args):
        """
        Call this when the user calls 'init' on the periph manager
        """
        self._chdr_ifaces = \
            self._init_interfaces(self._possible_chdr_ifaces)
        self._update_dispatchers()
        for _, table in iteritems(self._eth_dispatchers):
            if 'forward_eth' in args or 'forward_bcast' in args:
                table.set_forward_policy(
                    args.get('forward_eth', False),
                    args.get('forward_bcast', False)
                )
        if 'preload_ethtables' in args:
            self._preload_ethtables(
                self._eth_dispatchers,
                args['preload_ethtables']
            )

    def deinit(self):
        " Clean up after a session terminates "
        self._allocations = {}

    def get_xport_info(self):
        """
        Returns a dictionary of useful information, e.g. for appending into the
        device info.

        Note: This can be run by callers not owning a claim, even when the
        device has been claimed by someone else.

        In this case, returns the available IP addresses.
        """
        available_interfaces = \
            self._init_interfaces(self._possible_chdr_ifaces)
        return dict(zip(
            ("addr", "second_addr", "third_addr", "fourth_addr"),
            (x['ip_addr'] for x in itervalues(available_interfaces))
        ))

    def _preload_ethtables(self, eth_dispatchers, table_file):
        """
        Populates the ethernet tables from a JSON file
        """
        import json
        try:
            eth_table_data = json.load(open(table_file))
        except ValueError as ex:
            self.log.warning(
                "Bad values in preloading table file: %s",
                str(ex)
            )
            return
        self.log.info(
            "Preloading Ethernet dispatch tables from JSON file `%s'.",
            table_file
        )
        for eth_iface, data in iteritems(eth_table_data):
            if eth_iface not in eth_dispatchers:
                self.log.warning(
                    "Request to preload eth dispatcher table for "
                    "iface `{}', but no such interface is "
                    "registered. Known interfaces: {}".format(
                        str(eth_iface),
                        ",".join(eth_dispatchers.keys())
                    )
                )
                continue
            eth_dispatcher = eth_dispatchers[eth_iface]
            self.log.debug("Preloading {} dispatch table".format(eth_iface))
            try:
                for dst_ep, udp_data in iteritems(data):
                    sid = SID()
                    sid.set_dst_ep(int(dst_ep))
                    eth_dispatcher.set_route(
                        sid,
                        udp_data['ip_addr'],
                        udp_data['port'],
                        udp_data.get('mac_addr', None)
                    )
            except ValueError as ex:
                self.log.warning(
                    "Bad values in preloading table file: %s",
                    str(ex)
                )

    def get_xbar_dev(self, iface):
        """
        Given an Ethernet interface (e.g., 'eth1') returns the crossbar device
        it is connected to.
        """
        xbar_idx = self.iface_config[iface]['xbar']
        return "/dev/crossbar{}".format(xbar_idx)

    def request_xport(
            self,
            sid,
            xport_type,
        ):
        """
        Return UDP xport info
        """
        def fixup_sid(sid, iface_name):
            " Modify the source SID (e.g. the UHD SID) "
            if sid.src_addr <= self.max_ctrl_addr:
                sid.src_addr = self.iface_config[iface_name]['ctrl_src_addr']
            return sid

        def sort_xport_info(xport):
            """
            We sort xport_info (which is a list of xport) as follows:
            1. Look at current allocation of xport src_addr (which is the addr
               of host). If the allocation too large, in this case larger or
               equal to 2 (since 2*125 = 250MS/s = max bandwidth of SFP+ port),
               we will use the allocation for sorting.
            2. Else, we need to look at the destination block. The priority will
               yield to the xport that has the previous destination block
               that is the same as this coming destination block.
            Note: smaller number return is the higher chance to be picked
            """
            sid = SID(xport['send_sid'])
            src_addr = sid.src_addr
            prev_block = -1
            if src_addr in self._previous_block_ep:
                prev_block = self._previous_block_ep[src_addr]
            allocation = int(xport['allocation'])
            if allocation >= 2:
                return allocation
            else:
                return allocation if prev_block != sid.get_dst_block() else -1;

        assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA')
        allocation_getter = lambda iface: {
            'CTRL': 0,
            'ASYNC_MSG': 0,
            'RX_DATA': self._allocations.get(iface, {}).get('rx', 0),
            'TX_DATA': self._allocations.get(iface, {}).get('tx', 0),
        }[xport_type]
        xport_info = sorted([
            {
                'type': 'UDP',
                'ipv4': str(iface_info['ip_addr']),
                'port': str(self.chdr_port),
                'send_sid': str(fixup_sid(sid, iface_name)),
                'allocation': str(allocation_getter(iface_name)),
                'xport_type': xport_type,
                'link_speed': str(iface_info['link_speed'])
            }
            for iface_name, iface_info in iteritems(self._chdr_ifaces)
        ]
            , key=lambda x: sort_xport_info(x)
            , reverse=False)
        return xport_info

    def commit_xport(self, sid, xport_info):
        """
        fuu
        """
        self.log.trace("Sanity checking xport_info %s...", str(xport_info))
        assert xport_info['type'] == 'UDP'
        assert any([xport_info['ipv4'] == x['ip_addr']
                    for x in itervalues(self._chdr_ifaces)])
        assert xport_info['port'] == str(self.chdr_port)
        assert len(xport_info.get('src_ipv4')) > 5
        assert int(xport_info.get('src_port')) > 0
        sender_addr = xport_info['src_ipv4']
        sender_port = int(xport_info['src_port'])
        self.log.trace("Incoming connection is coming from %s:%d",
                       sender_addr, sender_port)
        mac_addr = net.get_mac_addr(sender_addr)
        if mac_addr is None:
            raise RuntimeError(
                "Could not find MAC address for IP address {}".format(
                    sender_addr))
        self.log.trace("Incoming connection is coming from %s",
                       mac_addr)
        eth_iface = net.ip_addr_to_iface(xport_info['ipv4'], self._chdr_ifaces)
        xbar_port = self.iface_config[eth_iface]['xbar_port']
        self.log.trace("Using Ethernet interface %s, crossbar port %d",
                       eth_iface, xbar_port)
        xbar_iface = lib.xbar.xbar.make(self.get_xbar_dev(eth_iface))
        xbar_iface.set_route(sid.src_addr, xbar_port)
        self._eth_dispatchers[eth_iface].set_route(
            sid.reversed(), sender_addr, sender_port)
        self.log.trace("UDP transport successfully committed!")
        self._previous_block_ep[sid.src_addr] = sid.get_dst_block()
        if xport_info.get('xport_type') == 'TX_DATA':
            self._allocations[eth_iface] = \
                {'tx': self._allocations.get(eth_iface, {}).get('tx', 0) + 1}
        if xport_info.get('xport_type') == 'RX_DATA':
            self._allocations[eth_iface] = \
                {'rx': self._allocations.get(eth_iface, {}).get('rx', 0) + 1}
        self.log.trace(
            "New link allocations for %s: TX: %d  RX: %d",
            eth_iface,
            self._allocations.get(eth_iface, {}).get('tx', 0),
            self._allocations.get(eth_iface, {}).get('rx', 0),
        )
        return True