aboutsummaryrefslogtreecommitdiffstats
path: root/mpm/python/usrp_mpm/xports
diff options
context:
space:
mode:
Diffstat (limited to 'mpm/python/usrp_mpm/xports')
-rw-r--r--mpm/python/usrp_mpm/xports/xportmgr_liberio.py69
-rw-r--r--mpm/python/usrp_mpm/xports/xportmgr_udp.py218
2 files changed, 47 insertions, 240 deletions
diff --git a/mpm/python/usrp_mpm/xports/xportmgr_liberio.py b/mpm/python/usrp_mpm/xports/xportmgr_liberio.py
index c9c5ee0f1..f7c1861af 100644
--- a/mpm/python/usrp_mpm/xports/xportmgr_liberio.py
+++ b/mpm/python/usrp_mpm/xports/xportmgr_liberio.py
@@ -8,8 +8,6 @@ Liberio Transport manager
"""
from builtins import object
-from usrp_mpm.liberiotable import LiberioDispatcherTable
-from usrp_mpm import lib
class XportMgrLiberio(object):
"""
@@ -19,14 +17,9 @@ class XportMgrLiberio(object):
liberio_label = 'liberio'
# Number of available DMA channels
max_chan = 4
- # Crossbar to which the Liberio DMA engine is connected
- xbar_dev = "/dev/crossbar0"
- xbar_port = 2
def __init__(self, log):
- self.log = log
- self._dma_dispatcher = LiberioDispatcherTable(self.liberio_label)
- self._data_chan_ctr = 0
+ self.log = log.getChild('Liberio')
def init(self, args):
"""
@@ -36,7 +29,7 @@ class XportMgrLiberio(object):
def deinit(self):
" Clean up after a session terminates "
- self._data_chan_ctr = 0
+ pass
def get_xport_info(self):
"""
@@ -48,50 +41,22 @@ class XportMgrLiberio(object):
In this case, returns an empty dict.
"""
+ assert hasattr(self, 'log')
return {}
- def request_xport(
- self,
- sid,
- xport_type,
- ):
- """
- Return liberio xport info
+ def get_chdr_link_options(self):
"""
- assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA')
- if xport_type == 'CTRL':
- chan = 0
- elif xport_type == 'ASYNC_MSG':
- chan = 1
- else:
- if self.max_chan > 4:
- chan = 2 + self._data_chan_ctr
- self._data_chan_ctr += 1
- else:
- if xport_type == 'RX_DATA':
- chan = 2
- else:
- chan = 3
- xport_info = {
- 'type': 'liberio',
- 'send_sid': str(sid),
- 'muxed': str(xport_type in ('CTRL', 'ASYNC_MSG')) if (self.max_chan > 4) else 'True',
- 'dma_chan': str(chan),
- 'tx_dev': "/dev/tx-dma{}".format(chan),
- 'rx_dev': "/dev/rx-dma{}".format(chan),
- }
- self.log.trace("Liberio: Chan: {} TX Device: {} RX Device: {}".format(
- chan, xport_info['tx_dev'], xport_info['rx_dev']))
- self.log.trace("Liberio channel is muxed: %s",
- "Yes" if xport_info['muxed'] else "No")
- return [xport_info]
-
- def commit_xport(self, sid, xport_info):
- " Commit liberio transport "
- chan = int(xport_info['dma_chan'])
- xbar_iface = lib.xbar.xbar(self.xbar_dev)
- xbar_iface.set_route(sid.src_addr, self.xbar_port)
- self._dma_dispatcher.set_route(sid.reversed(), chan)
- self.log.trace("Liberio transport successfully committed!")
- return True
+ Returns a list of dictionaries for returning by
+ PeriphManagerBase.get_chdr_link_options().
+ Note: This requires a claim, which means that init() was called, and
+ deinit() was not yet called.
+ """
+ return [
+ {
+ 'tx_dev': "/dev/tx-dma{}".format(chan),
+ 'rx_dev': "/dev/rx-dma{}".format(chan),
+ 'dma_chan': str(chan),
+ }
+ for chan in range(self.max_chan)
+ ]
diff --git a/mpm/python/usrp_mpm/xports/xportmgr_udp.py b/mpm/python/usrp_mpm/xports/xportmgr_udp.py
index 17762bb76..761db4a08 100644
--- a/mpm/python/usrp_mpm/xports/xportmgr_udp.py
+++ b/mpm/python/usrp_mpm/xports/xportmgr_udp.py
@@ -1,5 +1,6 @@
#
# Copyright 2017 Ettus Research, a National Instruments Company
+# Copyright 2019 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
@@ -9,10 +10,8 @@ UDP Transport manager
from builtins import object
from six import iteritems, itervalues
-from usrp_mpm.ethtable import EthDispatcherTable
+from usrp_mpm.ethdispatch import EthDispatcherCtrl
from usrp_mpm.sys_utils import net
-from usrp_mpm.mpmtypes import SID
-from usrp_mpm import lib
DEFAULT_BRIDGE_MODE = False
@@ -25,40 +24,38 @@ class XportMgrUDP(object):
# iface_config = {
# 'eth1': { # Add key for every Ethernet iface connected to the FPGA
# 'label': 'misc-enet-regs0', # UIO label for the Eth table
- # 'xbar': 0, # Which crossbar? 0 -> /dev/crossbar0
- # 'xbar_port': 0, # Which port on the crossbar it is connected to
# },
# }
iface_config = {}
bridges = {}
- # The control addresses are typically addresses bound to the controlling
- # UHD session. When the requested source address is below or equal to this
- # number, we override requested SID source addresses based on other logic.
- max_ctrl_addr = 1
def __init__(self, log, args):
- assert len(self.iface_config)
+ assert self.iface_config
assert all((
- all((key in x for key in ('label', 'xbar', 'xbar_port')))
+ all((key in x for key in ('label',)))
for x in itervalues(self.iface_config)
))
- self.log = log
+ self.log = log.getChild('UDP')
self.log.trace("Initializing UDP xport manager...")
self._possible_chdr_ifaces = self.iface_config.keys()
self.log.trace("Identifying available network interfaces...")
- self.chdr_port = EthDispatcherTable.DEFAULT_VITA_PORT[0]
- self._chdr_ifaces = \
- self._init_interfaces(self._possible_chdr_ifaces)
+ self.chdr_port = EthDispatcherCtrl.DEFAULT_VITA_PORT[0]
+ self._chdr_ifaces = self._init_interfaces(self._possible_chdr_ifaces)
self._bridge_mode = args.get('bridge_mode', DEFAULT_BRIDGE_MODE)
self._eth_dispatchers = {}
- self._allocations = {}
- self._previous_block_ep = {}
def _init_interfaces(self, possible_ifaces):
"""
Enumerate all network interfaces that are currently able to stream CHDR
Returns a dictionary iface name -> iface info, where iface info is the
return value of get_iface_info().
+
+ Arguments:
+ - possible_ifaces: A list of strings containing iface names, e.g.
+ ["sfp0", "sfp1"]
+
+ Return Value:
+ A list of dictionaries. The keys are determined by net.get_iface_info().
"""
self.log.trace("Testing available interfaces out of `{}'".format(
list(possible_ifaces)
@@ -83,7 +80,7 @@ class XportMgrUDP(object):
"{} to {}."
.format(len(valid_iface_infos), len(valid_iface_infos_filtered))
)
- if len(valid_iface_infos_filtered):
+ if valid_iface_infos_filtered:
self.log.debug(
"Found CHDR interfaces: `{}'"
.format(", ".join(list(valid_iface_infos.keys())))
@@ -104,10 +101,11 @@ class XportMgrUDP(object):
if len(self._chdr_ifaces) != 1 or bridge_iface != list(self.bridges.keys())[0]:
self.log.error("No Bridge Interfaces found")
raise RuntimeError("No Bridge Interfaces found")
- self.log.info("Updated dispatchers in bridge mode with bridge interface {}".format(
- bridge_iface))
+ self.log.info(
+ "Updated dispatchers in bridge mode with bridge interface {}"
+ .format(bridge_iface))
self._eth_dispatchers = {
- x: EthDispatcherTable(self.iface_config[x]['label'])
+ x: EthDispatcherCtrl(self.iface_config[x]['label'])
for x in self.bridges[bridge_iface]
}
for dispatcher, table in iteritems(self._eth_dispatchers):
@@ -130,7 +128,7 @@ class XportMgrUDP(object):
for iface in self._chdr_ifaces:
if iface not in self._eth_dispatchers:
self._eth_dispatchers[iface] = \
- EthDispatcherTable(self.iface_config[iface]['label'])
+ EthDispatcherCtrl(self.iface_config[iface]['label'])
self._eth_dispatchers[iface].set_ipv4_addr(
self._chdr_ifaces[iface]['ip_addr']
)
@@ -139,8 +137,7 @@ class XportMgrUDP(object):
"""
Call this when the user calls 'init' on the periph manager
"""
- self._chdr_ifaces = \
- self._init_interfaces(self._possible_chdr_ifaces)
+ self._chdr_ifaces = self._init_interfaces(self._possible_chdr_ifaces)
if "bridge_mode" in args:
self._bridge_mode = args.get("bridge_mode")
self._update_dispatchers()
@@ -153,16 +150,10 @@ class XportMgrUDP(object):
args.get('forward_eth', False),
args.get('forward_bcast', False)
)
- if 'preload_ethtables' in args:
- self._preload_ethtables(
- self._eth_dispatchers,
- args['preload_ethtables']
- )
-
def deinit(self):
" Clean up after a session terminates "
- self._allocations = {}
+ pass
def get_xport_info(self):
"""
@@ -174,174 +165,25 @@ class XportMgrUDP(object):
In this case, returns the available IP addresses.
"""
- available_interfaces = \
- self._init_interfaces(self._possible_chdr_ifaces)
+ available_interfaces = self._init_interfaces(self._possible_chdr_ifaces)
return dict(zip(
("addr", "second_addr", "third_addr", "fourth_addr"),
(x['ip_addr'] for x in itervalues(available_interfaces))
))
- def _preload_ethtables(self, eth_dispatchers, table_file):
- """
- Populates the ethernet tables from a JSON file
+ def get_chdr_link_options(self):
"""
- import json
- try:
- eth_table_data = json.load(open(table_file))
- except ValueError as ex:
- self.log.warning(
- "Bad values in preloading table file: %s",
- str(ex)
- )
- return
- self.log.info(
- "Preloading Ethernet dispatch tables from JSON file `%s'.",
- table_file
- )
- for eth_iface, data in iteritems(eth_table_data):
- if eth_iface not in eth_dispatchers:
- self.log.warning(
- "Request to preload eth dispatcher table for "
- "iface `{}', but no such interface is "
- "registered. Known interfaces: {}".format(
- str(eth_iface),
- ",".join(eth_dispatchers.keys())
- )
- )
- continue
- eth_dispatcher = eth_dispatchers[eth_iface]
- self.log.debug("Preloading {} dispatch table".format(eth_iface))
- try:
- for dst_addr, udp_data in iteritems(data):
- sid = SID()
- sid.set_dst_addr(int(dst_addr))
- eth_dispatcher.set_route(
- sid,
- udp_data['ip_addr'],
- udp_data['port'],
- udp_data.get('mac_addr', None)
- )
- except ValueError as ex:
- self.log.warning(
- "Bad values in preloading table file: %s",
- str(ex)
- )
+ Returns a list of dictionaries for returning by
+ PeriphManagerBase.get_chdr_link_options().
- def get_xbar_dev(self, iface):
- """
- Given an Ethernet interface (e.g., 'eth1') returns the crossbar device
- it is connected to.
+ Note: This requires a claim, which means that init() was called, and
+ deinit() was not yet called.
"""
- xbar_idx = self.iface_config[iface]['xbar']
- return "/dev/crossbar{}".format(xbar_idx)
-
- def request_xport(
- self,
- sid,
- xport_type,
- alloc_limit=2
- ):
- """
- Return UDP xport info
- """
- def fixup_sid(sid, iface_name):
- " Modify the source SID (e.g. the UHD SID) "
- if sid.src_addr <= self.max_ctrl_addr:
- sid.src_addr = self.iface_config[iface_name]['ctrl_src_addr']
- return sid
-
- def sort_xport_info(xport):
- """
- We sort xport_info (which is a list of xport) as follows:
- 1. Look at current allocation of xport src_addr (which is the addr
- of host). If the allocation too large, in this case larger or
- equal to 2 (since 2*125 = 250MS/s = max bandwidth of SFP+ port),
- we will use the allocation for sorting.
- 2. Else, we need to look at the destination block. The priority will
- yield to the xport that has the previous destination block
- that is the same as this coming destination block.
- Note: smaller number return is the higher chance to be picked
- """
- sid = SID(xport['send_sid'])
- src_addr = sid.src_addr
- prev_block = -1
- if src_addr in self._previous_block_ep:
- prev_block = self._previous_block_ep[src_addr]
- allocation = int(xport['allocation'])
- if allocation >= alloc_limit:
- return allocation
- else:
- return allocation if prev_block != sid.get_dst_block() else -1;
-
- assert xport_type in ('CTRL', 'ASYNC_MSG', 'TX_DATA', 'RX_DATA')
- allocation_getter = lambda iface: {
- 'CTRL': 0,
- 'ASYNC_MSG': 0,
- 'RX_DATA': self._allocations.get(iface, {}).get('rx', 0),
- 'TX_DATA': self._allocations.get(iface, {}).get('tx', 0),
- }[xport_type]
- xport_info = sorted([
+ return [
{
- 'type': 'UDP',
'ipv4': str(iface_info['ip_addr']),
'port': str(self.chdr_port),
- 'send_sid': str(fixup_sid(sid, iface_name)),
- 'allocation': str(allocation_getter(iface_name)),
- 'xport_type': xport_type,
- 'link_speed': str(iface_info['link_speed'])
+ 'link_rate': str(int(iface_info['link_speed'] * 1e9 / 8))
}
for iface_name, iface_info in iteritems(self._chdr_ifaces)
]
- , key=lambda x: sort_xport_info(x)
- , reverse=False)
- return xport_info
-
- def commit_xport(self, sid, xport_info):
- """
- Commit transport
-
- Saves the transport configuration to the device.
- Returns the status of the commit.
- """
- self.log.trace("Sanity checking xport_info %s...", str(xport_info))
- assert xport_info['type'] == 'UDP'
- assert any([xport_info['ipv4'] == x['ip_addr']
- for x in itervalues(self._chdr_ifaces)])
- assert xport_info['port'] == str(self.chdr_port)
- assert len(xport_info.get('src_ipv4')) > 5
- assert int(xport_info.get('src_port')) > 0
- sender_addr = xport_info['src_ipv4']
- sender_port = int(xport_info['src_port'])
- self.log.trace("Incoming connection is coming from %s:%d",
- sender_addr, sender_port)
- mac_addr = net.get_mac_addr(sender_addr)
- if mac_addr is None:
- raise RuntimeError(
- "Could not find MAC address for IP address {}".format(
- sender_addr))
- self.log.trace("Incoming connection is coming from %s",
- mac_addr)
- eth_iface = net.ip_addr_to_iface(xport_info['ipv4'], self._chdr_ifaces)
- xbar_port = self.iface_config[eth_iface]['xbar_port']
- self.log.trace("Using Ethernet interface %s, crossbar port %d",
- eth_iface, xbar_port)
- xbar_iface = lib.xbar.xbar(self.get_xbar_dev(eth_iface))
- xbar_iface.set_route(sid.src_addr, xbar_port)
- self._eth_dispatchers[eth_iface].set_route(
- sid.reversed(), sender_addr, sender_port)
- self.log.trace("UDP transport successfully committed!")
- self._previous_block_ep[sid.src_addr] = sid.get_dst_block()
- if xport_info.get('xport_type') == 'TX_DATA':
- self._allocations[eth_iface] = \
- {'tx': self._allocations.get(eth_iface, {}).get('tx', 0) + 1}
- if xport_info.get('xport_type') == 'RX_DATA':
- self._allocations[eth_iface] = \
- {'rx': self._allocations.get(eth_iface, {}).get('rx', 0) + 1}
- self.log.trace(
- "New link allocations for %s: TX: %d RX: %d",
- eth_iface,
- self._allocations.get(eth_iface, {}).get('tx', 0),
- self._allocations.get(eth_iface, {}).get('rx', 0),
- )
- return True
-