diff options
Diffstat (limited to 'mpm/python/usrp_mpm/xports')
-rw-r--r-- | mpm/python/usrp_mpm/xports/xportmgr_liberio.py | 69 | ||||
-rw-r--r-- | mpm/python/usrp_mpm/xports/xportmgr_udp.py | 218 |
2 files changed, 47 insertions, 240 deletions
diff --git a/mpm/python/usrp_mpm/xports/xportmgr_liberio.py b/mpm/python/usrp_mpm/xports/xportmgr_liberio.py index c9c5ee0f1..f7c1861af 100644 --- a/mpm/python/usrp_mpm/xports/xportmgr_liberio.py +++ b/mpm/python/usrp_mpm/xports/xportmgr_liberio.py @@ -8,8 +8,6 @@ Liberio Transport manager """ from builtins import object -from usrp_mpm.liberiotable import LiberioDispatcherTable -from usrp_mpm import lib class XportMgrLiberio(object): """ @@ -19,14 +17,9 @@ class XportMgrLiberio(object): liberio_label = 'liberio' # Number of available DMA channels max_chan = 4 - # Crossbar to which the Liberio DMA engine is connected - xbar_dev = "/dev/crossbar0" - xbar_port = 2 def __init__(self, log): - self.log = log - self._dma_dispatcher = LiberioDispatcherTable(self.liberio_label) - self._data_chan_ctr = 0 + self.log = log.getChild('Liberio') def init(self, args): """ @@ -36,7 +29,7 @@ class XportMgrLiberio(object): def deinit(self): " Clean up after a session terminates " - self._data_chan_ctr = 0 + pass def get_xport_info(self): """ @@ -48,50 +41,22 @@ class XportMgrLiberio(object): In this case, returns an empty dict. """ + assert hasattr(self, 'log') return {} - def request_xport( - self, - sid, - xport_type, - ): - """ - Return liberio xport info + def get_chdr_link_options(self): """ - assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA') - if xport_type == 'CTRL': - chan = 0 - elif xport_type == 'ASYNC_MSG': - chan = 1 - else: - if self.max_chan > 4: - chan = 2 + self._data_chan_ctr - self._data_chan_ctr += 1 - else: - if xport_type == 'RX_DATA': - chan = 2 - else: - chan = 3 - xport_info = { - 'type': 'liberio', - 'send_sid': str(sid), - 'muxed': str(xport_type in ('CTRL', 'ASYNC_MSG')) if (self.max_chan > 4) else 'True', - 'dma_chan': str(chan), - 'tx_dev': "/dev/tx-dma{}".format(chan), - 'rx_dev': "/dev/rx-dma{}".format(chan), - } - self.log.trace("Liberio: Chan: {} TX Device: {} RX Device: {}".format( - chan, xport_info['tx_dev'], xport_info['rx_dev'])) - self.log.trace("Liberio channel is muxed: %s", - "Yes" if xport_info['muxed'] else "No") - return [xport_info] - - def commit_xport(self, sid, xport_info): - " Commit liberio transport " - chan = int(xport_info['dma_chan']) - xbar_iface = lib.xbar.xbar(self.xbar_dev) - xbar_iface.set_route(sid.src_addr, self.xbar_port) - self._dma_dispatcher.set_route(sid.reversed(), chan) - self.log.trace("Liberio transport successfully committed!") - return True + Returns a list of dictionaries for returning by + PeriphManagerBase.get_chdr_link_options(). + Note: This requires a claim, which means that init() was called, and + deinit() was not yet called. + """ + return [ + { + 'tx_dev': "/dev/tx-dma{}".format(chan), + 'rx_dev': "/dev/rx-dma{}".format(chan), + 'dma_chan': str(chan), + } + for chan in range(self.max_chan) + ] diff --git a/mpm/python/usrp_mpm/xports/xportmgr_udp.py b/mpm/python/usrp_mpm/xports/xportmgr_udp.py index 17762bb76..761db4a08 100644 --- a/mpm/python/usrp_mpm/xports/xportmgr_udp.py +++ b/mpm/python/usrp_mpm/xports/xportmgr_udp.py @@ -1,5 +1,6 @@ # # Copyright 2017 Ettus Research, a National Instruments Company +# Copyright 2019 Ettus Research, a National Instruments Brand # # SPDX-License-Identifier: GPL-3.0-or-later # @@ -9,10 +10,8 @@ UDP Transport manager from builtins import object from six import iteritems, itervalues -from usrp_mpm.ethtable import EthDispatcherTable +from usrp_mpm.ethdispatch import EthDispatcherCtrl from usrp_mpm.sys_utils import net -from usrp_mpm.mpmtypes import SID -from usrp_mpm import lib DEFAULT_BRIDGE_MODE = False @@ -25,40 +24,38 @@ class XportMgrUDP(object): # iface_config = { # 'eth1': { # Add key for every Ethernet iface connected to the FPGA # 'label': 'misc-enet-regs0', # UIO label for the Eth table - # 'xbar': 0, # Which crossbar? 0 -> /dev/crossbar0 - # 'xbar_port': 0, # Which port on the crossbar it is connected to # }, # } iface_config = {} bridges = {} - # The control addresses are typically addresses bound to the controlling - # UHD session. When the requested source address is below or equal to this - # number, we override requested SID source addresses based on other logic. - max_ctrl_addr = 1 def __init__(self, log, args): - assert len(self.iface_config) + assert self.iface_config assert all(( - all((key in x for key in ('label', 'xbar', 'xbar_port'))) + all((key in x for key in ('label',))) for x in itervalues(self.iface_config) )) - self.log = log + self.log = log.getChild('UDP') self.log.trace("Initializing UDP xport manager...") self._possible_chdr_ifaces = self.iface_config.keys() self.log.trace("Identifying available network interfaces...") - self.chdr_port = EthDispatcherTable.DEFAULT_VITA_PORT[0] - self._chdr_ifaces = \ - self._init_interfaces(self._possible_chdr_ifaces) + self.chdr_port = EthDispatcherCtrl.DEFAULT_VITA_PORT[0] + self._chdr_ifaces = self._init_interfaces(self._possible_chdr_ifaces) self._bridge_mode = args.get('bridge_mode', DEFAULT_BRIDGE_MODE) self._eth_dispatchers = {} - self._allocations = {} - self._previous_block_ep = {} def _init_interfaces(self, possible_ifaces): """ Enumerate all network interfaces that are currently able to stream CHDR Returns a dictionary iface name -> iface info, where iface info is the return value of get_iface_info(). + + Arguments: + - possible_ifaces: A list of strings containing iface names, e.g. + ["sfp0", "sfp1"] + + Return Value: + A list of dictionaries. The keys are determined by net.get_iface_info(). """ self.log.trace("Testing available interfaces out of `{}'".format( list(possible_ifaces) @@ -83,7 +80,7 @@ class XportMgrUDP(object): "{} to {}." .format(len(valid_iface_infos), len(valid_iface_infos_filtered)) ) - if len(valid_iface_infos_filtered): + if valid_iface_infos_filtered: self.log.debug( "Found CHDR interfaces: `{}'" .format(", ".join(list(valid_iface_infos.keys()))) @@ -104,10 +101,11 @@ class XportMgrUDP(object): if len(self._chdr_ifaces) != 1 or bridge_iface != list(self.bridges.keys())[0]: self.log.error("No Bridge Interfaces found") raise RuntimeError("No Bridge Interfaces found") - self.log.info("Updated dispatchers in bridge mode with bridge interface {}".format( - bridge_iface)) + self.log.info( + "Updated dispatchers in bridge mode with bridge interface {}" + .format(bridge_iface)) self._eth_dispatchers = { - x: EthDispatcherTable(self.iface_config[x]['label']) + x: EthDispatcherCtrl(self.iface_config[x]['label']) for x in self.bridges[bridge_iface] } for dispatcher, table in iteritems(self._eth_dispatchers): @@ -130,7 +128,7 @@ class XportMgrUDP(object): for iface in self._chdr_ifaces: if iface not in self._eth_dispatchers: self._eth_dispatchers[iface] = \ - EthDispatcherTable(self.iface_config[iface]['label']) + EthDispatcherCtrl(self.iface_config[iface]['label']) self._eth_dispatchers[iface].set_ipv4_addr( self._chdr_ifaces[iface]['ip_addr'] ) @@ -139,8 +137,7 @@ class XportMgrUDP(object): """ Call this when the user calls 'init' on the periph manager """ - self._chdr_ifaces = \ - self._init_interfaces(self._possible_chdr_ifaces) + self._chdr_ifaces = self._init_interfaces(self._possible_chdr_ifaces) if "bridge_mode" in args: self._bridge_mode = args.get("bridge_mode") self._update_dispatchers() @@ -153,16 +150,10 @@ class XportMgrUDP(object): args.get('forward_eth', False), args.get('forward_bcast', False) ) - if 'preload_ethtables' in args: - self._preload_ethtables( - self._eth_dispatchers, - args['preload_ethtables'] - ) - def deinit(self): " Clean up after a session terminates " - self._allocations = {} + pass def get_xport_info(self): """ @@ -174,174 +165,25 @@ class XportMgrUDP(object): In this case, returns the available IP addresses. """ - available_interfaces = \ - self._init_interfaces(self._possible_chdr_ifaces) + available_interfaces = self._init_interfaces(self._possible_chdr_ifaces) return dict(zip( ("addr", "second_addr", "third_addr", "fourth_addr"), (x['ip_addr'] for x in itervalues(available_interfaces)) )) - def _preload_ethtables(self, eth_dispatchers, table_file): - """ - Populates the ethernet tables from a JSON file + def get_chdr_link_options(self): """ - import json - try: - eth_table_data = json.load(open(table_file)) - except ValueError as ex: - self.log.warning( - "Bad values in preloading table file: %s", - str(ex) - ) - return - self.log.info( - "Preloading Ethernet dispatch tables from JSON file `%s'.", - table_file - ) - for eth_iface, data in iteritems(eth_table_data): - if eth_iface not in eth_dispatchers: - self.log.warning( - "Request to preload eth dispatcher table for " - "iface `{}', but no such interface is " - "registered. Known interfaces: {}".format( - str(eth_iface), - ",".join(eth_dispatchers.keys()) - ) - ) - continue - eth_dispatcher = eth_dispatchers[eth_iface] - self.log.debug("Preloading {} dispatch table".format(eth_iface)) - try: - for dst_addr, udp_data in iteritems(data): - sid = SID() - sid.set_dst_addr(int(dst_addr)) - eth_dispatcher.set_route( - sid, - udp_data['ip_addr'], - udp_data['port'], - udp_data.get('mac_addr', None) - ) - except ValueError as ex: - self.log.warning( - "Bad values in preloading table file: %s", - str(ex) - ) + Returns a list of dictionaries for returning by + PeriphManagerBase.get_chdr_link_options(). - def get_xbar_dev(self, iface): - """ - Given an Ethernet interface (e.g., 'eth1') returns the crossbar device - it is connected to. + Note: This requires a claim, which means that init() was called, and + deinit() was not yet called. """ - xbar_idx = self.iface_config[iface]['xbar'] - return "/dev/crossbar{}".format(xbar_idx) - - def request_xport( - self, - sid, - xport_type, - alloc_limit=2 - ): - """ - Return UDP xport info - """ - def fixup_sid(sid, iface_name): - " Modify the source SID (e.g. the UHD SID) " - if sid.src_addr <= self.max_ctrl_addr: - sid.src_addr = self.iface_config[iface_name]['ctrl_src_addr'] - return sid - - def sort_xport_info(xport): - """ - We sort xport_info (which is a list of xport) as follows: - 1. Look at current allocation of xport src_addr (which is the addr - of host). If the allocation too large, in this case larger or - equal to 2 (since 2*125 = 250MS/s = max bandwidth of SFP+ port), - we will use the allocation for sorting. - 2. Else, we need to look at the destination block. The priority will - yield to the xport that has the previous destination block - that is the same as this coming destination block. - Note: smaller number return is the higher chance to be picked - """ - sid = SID(xport['send_sid']) - src_addr = sid.src_addr - prev_block = -1 - if src_addr in self._previous_block_ep: - prev_block = self._previous_block_ep[src_addr] - allocation = int(xport['allocation']) - if allocation >= alloc_limit: - return allocation - else: - return allocation if prev_block != sid.get_dst_block() else -1; - - assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA') - allocation_getter = lambda iface: { - 'CTRL': 0, - 'ASYNC_MSG': 0, - 'RX_DATA': self._allocations.get(iface, {}).get('rx', 0), - 'TX_DATA': self._allocations.get(iface, {}).get('tx', 0), - }[xport_type] - xport_info = sorted([ + return [ { - 'type': 'UDP', 'ipv4': str(iface_info['ip_addr']), 'port': str(self.chdr_port), - 'send_sid': str(fixup_sid(sid, iface_name)), - 'allocation': str(allocation_getter(iface_name)), - 'xport_type': xport_type, - 'link_speed': str(iface_info['link_speed']) + 'link_rate': str(int(iface_info['link_speed'] * 1e9 / 8)) } for iface_name, iface_info in iteritems(self._chdr_ifaces) ] - , key=lambda x: sort_xport_info(x) - , reverse=False) - return xport_info - - def commit_xport(self, sid, xport_info): - """ - Commit transport - - Saves the transport configuration to the device. - Returns the status of the commit. - """ - self.log.trace("Sanity checking xport_info %s...", str(xport_info)) - assert xport_info['type'] == 'UDP' - assert any([xport_info['ipv4'] == x['ip_addr'] - for x in itervalues(self._chdr_ifaces)]) - assert xport_info['port'] == str(self.chdr_port) - assert len(xport_info.get('src_ipv4')) > 5 - assert int(xport_info.get('src_port')) > 0 - sender_addr = xport_info['src_ipv4'] - sender_port = int(xport_info['src_port']) - self.log.trace("Incoming connection is coming from %s:%d", - sender_addr, sender_port) - mac_addr = net.get_mac_addr(sender_addr) - if mac_addr is None: - raise RuntimeError( - "Could not find MAC address for IP address {}".format( - sender_addr)) - self.log.trace("Incoming connection is coming from %s", - mac_addr) - eth_iface = net.ip_addr_to_iface(xport_info['ipv4'], self._chdr_ifaces) - xbar_port = self.iface_config[eth_iface]['xbar_port'] - self.log.trace("Using Ethernet interface %s, crossbar port %d", - eth_iface, xbar_port) - xbar_iface = lib.xbar.xbar(self.get_xbar_dev(eth_iface)) - xbar_iface.set_route(sid.src_addr, xbar_port) - self._eth_dispatchers[eth_iface].set_route( - sid.reversed(), sender_addr, sender_port) - self.log.trace("UDP transport successfully committed!") - self._previous_block_ep[sid.src_addr] = sid.get_dst_block() - if xport_info.get('xport_type') == 'TX_DATA': - self._allocations[eth_iface] = \ - {'tx': self._allocations.get(eth_iface, {}).get('tx', 0) + 1} - if xport_info.get('xport_type') == 'RX_DATA': - self._allocations[eth_iface] = \ - {'rx': self._allocations.get(eth_iface, {}).get('rx', 0) + 1} - self.log.trace( - "New link allocations for %s: TX: %d RX: %d", - eth_iface, - self._allocations.get(eth_iface, {}).get('tx', 0), - self._allocations.get(eth_iface, {}).get('rx', 0), - ) - return True - |